From 77c7b724ebc1f6a3a223a4da6337619294560f86 Mon Sep 17 00:00:00 2001 From: Francesco Date: Fri, 16 Feb 2024 19:14:10 +0100 Subject: [PATCH] Reviewed flushing and compaction --- build.gradle | 5 +- .../tomfran/lsm/tree/LSMTreeAddBenchmark.java | 4 +- .../tomfran/lsm/tree/LSMTreeGetBenchmark.java | 4 +- .../com/tomfran/lsm/utils/BenchmarkUtils.java | 4 +- src/main/java/com/tomfran/lsm/Main.java | 49 ++++-- .../java/com/tomfran/lsm/sstable/SSTable.java | 160 +++++++++++------- .../java/com/tomfran/lsm/tree/LSMTree.java | 118 +++++++++---- .../tomfran/lsm/sstable/SSTableMergeTest.java | 11 +- .../lsm/sstable/SSTableReconstructTest.java | 7 +- .../com/tomfran/lsm/sstable/SSTableTest.java | 7 +- .../com/tomfran/lsm/tree/LSMTreeTest.java | 2 +- .../lsm/utils/UniqueSortedIteratorTest.java | 1 - 12 files changed, 236 insertions(+), 136 deletions(-) diff --git a/build.gradle b/build.gradle index c798211..ecc29dc 100644 --- a/build.gradle +++ b/build.gradle @@ -26,9 +26,10 @@ tasks.test { jmh { fork = 1 - warmupIterations = 1 - iterations = 5 + warmupIterations = 3 + iterations = 10 benchmarkMode = ['thrpt'] + includes = ['LSMTreeAddBenchmark*'] jmhTimeout = '15s' jmhVersion = '1.37' resultFormat = 'JSON' diff --git a/src/jmh/java/com/tomfran/lsm/tree/LSMTreeAddBenchmark.java b/src/jmh/java/com/tomfran/lsm/tree/LSMTreeAddBenchmark.java index f2d876e..9c09361 100644 --- a/src/jmh/java/com/tomfran/lsm/tree/LSMTreeAddBenchmark.java +++ b/src/jmh/java/com/tomfran/lsm/tree/LSMTreeAddBenchmark.java @@ -13,8 +13,8 @@ public class LSMTreeAddBenchmark { static final Path DIR = Path.of("tree_add_benchmark"); static final int NUM_ITEMS = 1000000; - static final int MEMTABLE_SIZE = 1 << 18; - static final int LEVEL_SIZE = 5; + static final int MEMTABLE_SIZE = 1024 * 1024 * 256; + static final int LEVEL_SIZE = 2; static int index = 0; LSMTree tree; diff --git a/src/jmh/java/com/tomfran/lsm/tree/LSMTreeGetBenchmark.java b/src/jmh/java/com/tomfran/lsm/tree/LSMTreeGetBenchmark.java index 025ac6a..cc6abff 100644 --- a/src/jmh/java/com/tomfran/lsm/tree/LSMTreeGetBenchmark.java +++ b/src/jmh/java/com/tomfran/lsm/tree/LSMTreeGetBenchmark.java @@ -16,8 +16,8 @@ public class LSMTreeGetBenchmark { static final Path DIR = Path.of("tree_get_benchmark"); static final int NUM_ITEMS = 1000000; - static final int MEMTABLE_SIZE = 1 << 18; - static final int LEVEL_SIZE = 5; + static final int MEMTABLE_SIZE = 1024 * 1024 * 256; + static final int LEVEL_SIZE = 4; static int index = 0; LSMTree tree; diff --git a/src/jmh/java/com/tomfran/lsm/utils/BenchmarkUtils.java b/src/jmh/java/com/tomfran/lsm/utils/BenchmarkUtils.java index 7ff04b9..e27493e 100644 --- a/src/jmh/java/com/tomfran/lsm/utils/BenchmarkUtils.java +++ b/src/jmh/java/com/tomfran/lsm/utils/BenchmarkUtils.java @@ -12,11 +12,11 @@ public class BenchmarkUtils { - public static LSMTree initTree(Path dir, int memSize, int levelSize) { + public static LSMTree initTree(Path dir, int memSize, int immutableSize) { if (Files.exists(dir)) deleteDir(dir); - return new LSMTree(memSize, levelSize, dir.toString()); + return new LSMTree(memSize, immutableSize, dir.toString()); } public static void stopTreeAndCleanDisk(LSMTree tree, Path dir) { diff --git a/src/main/java/com/tomfran/lsm/Main.java b/src/main/java/com/tomfran/lsm/Main.java index d69aaf7..614f59e 100644 --- a/src/main/java/com/tomfran/lsm/Main.java +++ b/src/main/java/com/tomfran/lsm/Main.java @@ -6,7 +6,9 @@ import java.io.File; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Random; import java.util.Scanner; +import java.util.stream.IntStream; import java.util.stream.Stream; public class Main { @@ -18,30 +20,34 @@ public static void main(String[] args) throws InterruptedException { if (new File(DIRECTORY).exists()) deleteDir(); - LSMTree tree = new LSMTree(5, 2, DIRECTORY); + LSMTree tree = new LSMTree(1024 * 1024 * 5, 2, DIRECTORY); Scanner scanner = new Scanner(System.in); scanner.useDelimiter("\n"); String intro = """ - - | __| \\ | __ __| \s - | \\__ \\ |\\/ | ____| | _| -_) -_)\s - ____| ____/ _| _| _| _| \\___| \\___|\s - """; + + | __| \\ | __ __| \s + | \\__ \\ |\\/ | ____| | _| -_) -_)\s + ____| ____/ _| _| _| _| \\___| \\___|\s + """; String help = """ - Commands: - - s/set : insert a key-value pair; - - g/get : get a key value; - - d/del : delete a key; - - e/exit : stop the console; - - h/help : show this message. - """; + Commands: + - s/set : insert a key-value pair; + - r/rgn : insert this range of numeric keys with random values; + - g/get : get a key value; + - d/del : delete a key; + - p/prt : print current tree status; + - e/exit : stop the console; + - h/help : show this message. + """; System.out.println(intro); System.out.println(help); + Random r = new Random(); + boolean exit = false; while (!exit) { @@ -56,6 +62,12 @@ public static void main(String[] args) throws InterruptedException { tree.add(new ByteArrayPair(parts[1].getBytes(), parts[2].getBytes())); System.out.println("ok"); } + case "r", "rng" -> { + IntStream.range(Integer.parseInt(parts[1]), Integer.parseInt(parts[2])) + .forEach(i -> tree.add(new ByteArrayPair(intToBytes(i), intToBytes(r.nextInt())))); + + System.out.println("ok"); + } case "d", "del" -> { tree.delete(parts[1].getBytes()); System.out.println("ok"); @@ -64,12 +76,14 @@ public static void main(String[] args) throws InterruptedException { byte[] value = tree.get(parts[1].getBytes()); System.out.println((value == null || value.length == 0) ? "not found" : new String(value)); } + case "p", "prt" -> System.out.println(tree); case "h", "help" -> System.out.println(help); case "e", "exit" -> exit = true; default -> System.out.println("Unknown command"); } } catch (Exception e) { System.out.printf("### error while executing command: \"%s\"\n", command); + e.printStackTrace(); } } tree.stop(); @@ -85,4 +99,13 @@ static private void deleteDir() { } } + static byte[] intToBytes(int i) { + byte[] result = new byte[4]; + result[0] = (byte) (i & 0xFF); + result[1] = (byte) ((i >> 8) & 0xFF); + result[2] = (byte) ((i >> 16) & 0xFF); + result[3] = (byte) ((i >> 24) & 0xFF); + return result; + } + } diff --git a/src/main/java/com/tomfran/lsm/sstable/SSTable.java b/src/main/java/com/tomfran/lsm/sstable/SSTable.java index 94440f1..61625a3 100644 --- a/src/main/java/com/tomfran/lsm/sstable/SSTable.java +++ b/src/main/java/com/tomfran/lsm/sstable/SSTable.java @@ -1,6 +1,7 @@ package com.tomfran.lsm.sstable; import com.tomfran.lsm.bloom.BloomFilter; +import com.tomfran.lsm.comparator.ByteArrayComparator; import com.tomfran.lsm.io.ExtendedInputStream; import com.tomfran.lsm.io.ExtendedOutputStream; import com.tomfran.lsm.types.ByteArrayPair; @@ -15,6 +16,8 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import static com.tomfran.lsm.comparator.ByteArrayComparator.compare; @@ -24,15 +27,22 @@ public class SSTable implements Iterable { public static final String BLOOM_FILE_EXTENSION = ".bloom"; public static final String INDEX_FILE_EXTENSION = ".index"; - String filename; + private static final int DEFAULT_SAMPLE_SIZE = 1000; + + static final AtomicLong SST_COUNTER = new AtomicLong(); + + public String filename; ExtendedInputStream is; - int size; + public int size; LongArrayList sparseOffsets; IntArrayList sparseSizeCount; ObjectArrayList sparseKeys; BloomFilter bloomFilter; + byte[] minKey; + byte[] maxKey; + /** * Create a new SSTable from an Iterable of Items. * @@ -40,9 +50,22 @@ public class SSTable implements Iterable { * @param items The items to write to the SSTable, assumed to be sorted. * @param sampleSize The number of items to skip between sparse index entries. */ - public SSTable(String filename, Iterator items, int sampleSize) { + public SSTable(String directory, Iterator items, int sampleSize) { + this(getNextSstFilename(directory), items, sampleSize, 1024 * 1024 * 256); + } + + public SSTable(String directory, Iterator items) { + this(getNextSstFilename(directory), items, DEFAULT_SAMPLE_SIZE, 1024 * 1024 * 256); + } + + public SSTable(String directory, Iterator items, long maxByteSize) { + this(getNextSstFilename(directory), items, DEFAULT_SAMPLE_SIZE, maxByteSize); + } + + + public SSTable(String filename, Iterator items, int sampleSize, long maxByteSize) { this.filename = filename; - writeItems(filename, items, sampleSize); + writeItems(filename, items, sampleSize, maxByteSize); is = new ExtendedInputStream(filename + DATA_FILE_EXTENSION); } @@ -56,62 +79,19 @@ public SSTable(String filename) { initializeFromDisk(filename); } - /** - * Merge multiple SSTables into a single SSTable. - * - * @param filename The filename to write the SSTable to. - * @param sampleSize The number of items to skip between sparse index entries. - * @param tables The SSTables to merge. - * @return The merged SSTable. - */ - public static SSTable merge(String filename, int sampleSize, Iterable... tables) { - Iterator[] itArray = Arrays.stream(tables).map(Iterable::iterator) - .toArray(Iterator[]::new); + public static ObjectArrayList sortedRun(String dataDir, long sstMaxSize, SSTable... tables) { + SSTableIterator[] itArray = Arrays.stream(tables).map(SSTable::iterator).toArray(SSTableIterator[]::new); IteratorMerger merger = new IteratorMerger<>(itArray); UniqueSortedIterator uniqueSortedIterator = new UniqueSortedIterator<>(merger); - return new SSTable(filename, uniqueSortedIterator, sampleSize); - } - - public static SSTable merge(String filename, int sampleSize, LinkedList tableLinkedList) { - return merge(filename, sampleSize, tableLinkedList.toArray(new Iterable[]{})); - } - - private void initializeFromDisk(String filename) { - // items file - is = new ExtendedInputStream(filename + DATA_FILE_EXTENSION); - - // sparse index - sparseOffsets = new LongArrayList(); - sparseSizeCount = new IntArrayList(); - sparseKeys = new ObjectArrayList<>(); - - ExtendedInputStream indexIs = new ExtendedInputStream(filename + INDEX_FILE_EXTENSION); - size = indexIs.readVByteInt(); - - int sparseSize = indexIs.readVByteInt(); - long offsetsCumulative = 0; - sparseOffsets.add(offsetsCumulative); - for (int i = 0; i < sparseSize - 1; i++) { - offsetsCumulative += indexIs.readVByteLong(); - sparseOffsets.add(offsetsCumulative); - } + ObjectArrayList res = new ObjectArrayList<>(); - int sizeCumulative = 0; - sparseSizeCount.add(sizeCumulative); - for (int i = 0; i < sparseSize - 1; i++) { - sizeCumulative += indexIs.readVByteInt(); - sparseSizeCount.add(sizeCumulative); + while (uniqueSortedIterator.hasNext()) { + res.add(new SSTable(getNextSstFilename(dataDir), uniqueSortedIterator, DEFAULT_SAMPLE_SIZE, sstMaxSize)); } - for (int i = 0; i < sparseSize; i++) - sparseKeys.add(indexIs.readNBytes(indexIs.readVByteInt())); - - is.close(); - - // bloom filter - bloomFilter = BloomFilter.readFromFile(filename + BLOOM_FILE_EXTENSION); + return res; } /** @@ -121,7 +101,9 @@ private void initializeFromDisk(String filename) { * @return The item with the given key, or null if no such item exists. */ public byte[] get(byte[] key) { - if (!bloomFilter.mightContain(key)) + if (ByteArrayComparator.compare(key, minKey) == -1 || + ByteArrayComparator.compare(key, maxKey) == 1 || + !bloomFilter.mightContain(key)) return null; int offsetIndex = getCandidateOffsetIndex(key); @@ -193,6 +175,46 @@ public void closeAndDelete() { deleteFiles(); } + private static String getNextSstFilename(String directory) { + return String.format("%s/sst_%d", directory, SST_COUNTER.incrementAndGet()); + } + + private void initializeFromDisk(String filename) { + // items file + is = new ExtendedInputStream(filename + DATA_FILE_EXTENSION); + + // sparse index + sparseOffsets = new LongArrayList(); + sparseSizeCount = new IntArrayList(); + sparseKeys = new ObjectArrayList<>(); + + ExtendedInputStream indexIs = new ExtendedInputStream(filename + INDEX_FILE_EXTENSION); + size = indexIs.readVByteInt(); + + int sparseSize = indexIs.readVByteInt(); + long offsetsCumulative = 0; + sparseOffsets.add(offsetsCumulative); + for (int i = 0; i < sparseSize - 1; i++) { + offsetsCumulative += indexIs.readVByteLong(); + sparseOffsets.add(offsetsCumulative); + } + + int sizeCumulative = 0; + sparseSizeCount.add(sizeCumulative); + for (int i = 0; i < sparseSize - 1; i++) { + sizeCumulative += indexIs.readVByteInt(); + sparseSizeCount.add(sizeCumulative); + } + + for (int i = 0; i < sparseSize; i++) + sparseKeys.add(indexIs.readNBytes(indexIs.readVByteInt())); + + is.close(); + + // bloom filter + bloomFilter = BloomFilter.readFromFile(filename + BLOOM_FILE_EXTENSION); + } + private int getCandidateOffsetIndex(byte[] key) { int low = 0; int high = sparseOffsets.size() - 1; @@ -211,7 +233,7 @@ else if (cmp > 0) return low; } - private void writeItems(String filename, Iterator items, int sampleSize) { + private void writeItems(String filename, Iterator items, int sampleSize, long maxByteSize) { ExtendedOutputStream ios = new ExtendedOutputStream(filename + DATA_FILE_EXTENSION); sparseOffsets = new LongArrayList(); @@ -220,33 +242,45 @@ private void writeItems(String filename, Iterator items, int samp bloomFilter = new BloomFilter(); // write items and populate indexes - int size = 0; + int numElements = 0; long offset = 0L; - while (items.hasNext()) { + long byteSize = 0L; + + while (items.hasNext() && byteSize < maxByteSize) { ByteArrayPair item = items.next(); - if (size % sampleSize == 0) { + + if (minKey == null) + minKey = item.key(); + + maxKey = item.key(); + + if (numElements % sampleSize == 0) { sparseOffsets.add(offset); - sparseSizeCount.add(size); + sparseSizeCount.add(numElements); sparseKeys.add(item.key()); } + bloomFilter.add(item.key()); offset += ios.writeByteArrayPair(item); - size++; + numElements++; + + byteSize += item.size(); } + ios.close(); - if (size == 0) { + if (numElements == 0) { throw new IllegalArgumentException("Attempted to create an SSTable from an empty iterator"); } - this.size = size; + this.size = numElements; // write bloom filter and index to disk bloomFilter.writeToFile(filename + BLOOM_FILE_EXTENSION); ExtendedOutputStream indexOs = new ExtendedOutputStream(filename + INDEX_FILE_EXTENSION); - indexOs.writeVByteInt(size); + indexOs.writeVByteInt(numElements); int sparseSize = sparseOffsets.size(); indexOs.writeVByteInt(sparseSize); diff --git a/src/main/java/com/tomfran/lsm/tree/LSMTree.java b/src/main/java/com/tomfran/lsm/tree/LSMTree.java index dc87ba5..07bdcfa 100644 --- a/src/main/java/com/tomfran/lsm/tree/LSMTree.java +++ b/src/main/java/com/tomfran/lsm/tree/LSMTree.java @@ -8,9 +8,11 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.LinkedList; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; /** * LSM Tree implementation. @@ -25,9 +27,10 @@ */ public class LSMTree { - static final long DEFAULT_MEMTABLE_MAX__BYTE_SIZE = 1024 * 1024 * 256; - static final int DEFAULT_TABLE_LEVEL_MAX_SIZE = 5; - static final int DEFAULT_SSTABLE_SAMPLE_SIZE = 1 << 10; + static final long DEFAULT_MEMTABLE_MAX_BYTE_SIZE = 1024 * 1024 * 32; + static final int DEFAULT_LEVEL_ZERO_MAX_SIZE = 2; + static final int LEVEL_INCR_FACTOR = 2; + static final String DEFAULT_DATA_DIRECTORY = "LSM-data"; final Object mutableMemtableLock = new Object(); @@ -35,41 +38,47 @@ public class LSMTree { final Object tableLock = new Object(); final long mutableMemtableMaxSize; - final int tableLevelMaxSize; + final int maxLevelZeroSstNumber; + final long maxLevelZeroSstByteSize; final String dataDir; Memtable mutableMemtable; LinkedList immutableMemtables; - ObjectArrayList> tables; - ExecutorService memtableFlusher; - ExecutorService tableCompactor; + ObjectArrayList> levels; + + ScheduledExecutorService memtableFlusher; + ScheduledExecutorService tableCompactor; /** * Creates a new LSMTree with a default memtable size and data directory. */ public LSMTree() { - this(DEFAULT_MEMTABLE_MAX__BYTE_SIZE, DEFAULT_TABLE_LEVEL_MAX_SIZE, DEFAULT_DATA_DIRECTORY); + this(DEFAULT_MEMTABLE_MAX_BYTE_SIZE, DEFAULT_LEVEL_ZERO_MAX_SIZE, DEFAULT_DATA_DIRECTORY); } /** * Creates a new LSMTree with a memtable size and data directory. * * @param mutableMemtableMaxByteSize The maximum size of the memtable before it is flushed to disk. - * @param dataDir The directory to store the data in. + * @param dataDir The directory to store the data in. */ - public LSMTree(long mutableMemtableMaxByteSize, int tableLevelMaxSize, String dataDir) { + public LSMTree(long mutableMemtableMaxByteSize, int maxLevelZeroSstNumber, String dataDir) { this.mutableMemtableMaxSize = mutableMemtableMaxByteSize; - this.tableLevelMaxSize = tableLevelMaxSize; + this.maxLevelZeroSstNumber = maxLevelZeroSstNumber; + this.maxLevelZeroSstByteSize = mutableMemtableMaxByteSize * 2; this.dataDir = dataDir; createDataDir(); mutableMemtable = new Memtable(); immutableMemtables = new LinkedList<>(); - tables = new ObjectArrayList<>(); - tables.add(new LinkedList<>()); + levels = new ObjectArrayList<>(); + levels.add(new ObjectArrayList<>()); - memtableFlusher = newSingleThreadExecutor(); - tableCompactor = newSingleThreadExecutor(); + memtableFlusher = newSingleThreadScheduledExecutor(); + memtableFlusher.scheduleAtFixedRate(this::flushMemtable, 50, 50, TimeUnit.MILLISECONDS); + + tableCompactor = newSingleThreadScheduledExecutor(); + tableCompactor.scheduleAtFixedRate(this::levelCompaction, 200, 200, TimeUnit.MILLISECONDS); } @@ -120,7 +129,7 @@ public byte[] get(byte[] key) { } synchronized (tableLock) { - for (LinkedList level : tables) + for (ObjectArrayList level : levels) for (SSTable table : level) if ((result = table.get(key)) != null) return result; @@ -144,14 +153,11 @@ private void checkMemtableSize() { synchronized (immutableMemtablesLock) { immutableMemtables.addFirst(mutableMemtable); mutableMemtable = new Memtable(); - memtableFlusher.execute(this::flushLastMemtable); } } - private void flushLastMemtable() { + private void flushMemtable() { Memtable memtableToFlush; - - // extract immutable memtable which need to be flushed synchronized (immutableMemtablesLock) { if (immutableMemtables.isEmpty()) return; @@ -159,45 +165,57 @@ private void flushLastMemtable() { memtableToFlush = immutableMemtables.getLast(); } - String filename = getTableName(); + SSTable table = new SSTable(dataDir, memtableToFlush.iterator(), mutableMemtableMaxSize * 2); synchronized (tableLock) { - tables.get(0).addFirst(new SSTable(filename, memtableToFlush.iterator(), DEFAULT_SSTABLE_SAMPLE_SIZE)); - tableCompactor.execute(this::compactTables); + levels.get(0).add(table); } - // remove flushed memtable from immutable memtables synchronized (immutableMemtablesLock) { immutableMemtables.removeLast(); } } - private void compactTables() { + private void levelCompaction() { synchronized (tableLock) { + int n = levels.size(); - int n = tables.size(); + int maxLevelSize = maxLevelZeroSstNumber; + long sstMaxSize = maxLevelZeroSstByteSize; for (int i = 0; i < n; i++) { - var level = tables.get(i); - if (level.size() <= tableLevelMaxSize) - continue; + ObjectArrayList level = levels.get(i); - var table = SSTable.merge(getTableName(), DEFAULT_SSTABLE_SAMPLE_SIZE, level); + if (level.size() <= maxLevelSize) + continue; + // add new level if needed if (i == n - 1) - tables.add(new LinkedList<>()); + levels.add(new ObjectArrayList<>()); + + // take all tables from the current and next level + ObjectArrayList nextLevel = levels.get(i + 1); + ObjectArrayList merge = new ObjectArrayList<>(); + merge.addAll(level); + merge.addAll(nextLevel); + + // perform a sorted run and replace the next level + var sortedRun = SSTable.sortedRun(dataDir, sstMaxSize, merge.toArray(SSTable[]::new)); - tables.get(i + 1).addFirst(table); + // delete previous tables level.forEach(SSTable::closeAndDelete); level.clear(); + nextLevel.forEach(SSTable::closeAndDelete); + nextLevel.clear(); + + nextLevel.addAll(sortedRun); + + maxLevelSize *= LEVEL_INCR_FACTOR; + sstMaxSize *= LEVEL_INCR_FACTOR; } } } - private String getTableName() { - return String.format("%s/sst_%d", dataDir, System.currentTimeMillis()); - } - private void createDataDir() { try { Files.createDirectory(Path.of(dataDir)); @@ -206,4 +224,30 @@ private void createDataDir() { } } + + @Override + public String toString() { + + var s = new StringBuilder(); + s.append("LSM-Tree {\n"); + s.append("\tmemtable: "); + s.append(mutableMemtable.byteSize() / 1024.0 / 1024.0); + s.append(" mb\n"); + s.append("\timmutable memtables: "); + s.append(immutableMemtables); + s.append("\n\tsst levels:\n"); + + int i = 0; + for (var level : levels) { + s.append(String.format("\t\t- %d: ", i)); + level.stream() + .map(st -> String.format("[ %s, size: %d ] ", st.filename, st.size)) + .forEach(s::append); + s.append("\n"); + i += 1; + } + + s.append("}"); + return s.toString(); + } } diff --git a/src/test/java/com/tomfran/lsm/sstable/SSTableMergeTest.java b/src/test/java/com/tomfran/lsm/sstable/SSTableMergeTest.java index 87e081e..34e2ef5 100644 --- a/src/test/java/com/tomfran/lsm/sstable/SSTableMergeTest.java +++ b/src/test/java/com/tomfran/lsm/sstable/SSTableMergeTest.java @@ -19,7 +19,7 @@ public class SSTableMergeTest { static final String MERGE_FILE = "/merge", TABLE_FILE2 = "/test2"; @TempDir static Path tempDirectory; - static SSTable merge, second; + static SSTable merge, first, second; static List firstItems, secondItems, expectedItems; @BeforeAll @@ -39,16 +39,17 @@ public static void setup() { expectedItems.addAll(firstItems); secondItems.stream().skip(n / 2).forEach(expectedItems::add); - var first = new Memtable(); - firstItems.forEach(first::add); - second = new SSTable(tempDirectory + TABLE_FILE2, secondItems.iterator(), 100); - merge = SSTable.merge(tempDirectory + MERGE_FILE, 100, first, second); + first = new SSTable(tempDirectory.toString(), firstItems.iterator(), 100); + second = new SSTable(tempDirectory.toString(), secondItems.iterator(), 100); + + merge = SSTable.sortedRun(tempDirectory.toString(), 1024 * 1024 * 1024, first, second).get(0); } @AfterAll public static void teardown() { merge.close(); + first.close(); second.close(); } diff --git a/src/test/java/com/tomfran/lsm/sstable/SSTableReconstructTest.java b/src/test/java/com/tomfran/lsm/sstable/SSTableReconstructTest.java index 6ed2417..9deb07a 100644 --- a/src/test/java/com/tomfran/lsm/sstable/SSTableReconstructTest.java +++ b/src/test/java/com/tomfran/lsm/sstable/SSTableReconstructTest.java @@ -17,7 +17,6 @@ public class SSTableReconstructTest { - static final String FILE1 = "/sstable1", FILE2 = "/sstable2"; @TempDir static Path tempDirectory; static SSTable t1; @@ -34,17 +33,17 @@ static void setup() throws IOException { .sorted((a, b) -> compare(a.key(), b.key())) .toList(); - t1 = new SSTable(tempDirectory + FILE1, items.iterator(), 3); + t1 = new SSTable(tempDirectory.toString(), items.iterator(), 3); for (var end : List.of(INDEX_FILE_EXTENSION, DATA_FILE_EXTENSION, BLOOM_FILE_EXTENSION)) - Files.copy(Path.of(tempDirectory + FILE1 + end), Path.of(tempDirectory + FILE2 + end)); + Files.copy(Path.of(t1.filename + end), Path.of(t1.filename + "_copy" + end)); t1.close(); } @Test void shouldReconstruct() { - var t2 = new SSTable(tempDirectory + FILE2); + var t2 = new SSTable(t1.filename + "_copy"); assert t2.size == t1.size; diff --git a/src/test/java/com/tomfran/lsm/sstable/SSTableTest.java b/src/test/java/com/tomfran/lsm/sstable/SSTableTest.java index d826edd..fb31367 100644 --- a/src/test/java/com/tomfran/lsm/sstable/SSTableTest.java +++ b/src/test/java/com/tomfran/lsm/sstable/SSTableTest.java @@ -16,7 +16,6 @@ class SSTableTest { - static final String TEST_FILE = "/sstable"; static final int NUM_ITEMS = 10; static final int SAMPLE_SIZE = NUM_ITEMS / 3; @@ -37,8 +36,8 @@ public static void setup() { // sort and divide into inserted and skipped var items = l.stream() - .sorted((a, b) -> ByteArrayComparator.compare(a.key(), b.key())) - .toList(); + .sorted((a, b) -> ByteArrayComparator.compare(a.key(), b.key())) + .toList(); inserted = new ObjectArrayList<>(); skipped = new ObjectArrayList<>(); @@ -51,7 +50,7 @@ public static void setup() { skipped.add(e); } - t = new SSTable(tempDirectory + TEST_FILE, inserted.iterator(), SAMPLE_SIZE); + t = new SSTable(tempDirectory.toString(), inserted.iterator(), SAMPLE_SIZE); } @AfterAll diff --git a/src/test/java/com/tomfran/lsm/tree/LSMTreeTest.java b/src/test/java/com/tomfran/lsm/tree/LSMTreeTest.java index 9ab223d..b34d1a4 100644 --- a/src/test/java/com/tomfran/lsm/tree/LSMTreeTest.java +++ b/src/test/java/com/tomfran/lsm/tree/LSMTreeTest.java @@ -25,7 +25,7 @@ public void writeFlush() throws InterruptedException { Thread.sleep(500); assert tree.mutableMemtable.byteSize() >= 1 : "mutable memtable size is " + tree.mutableMemtable.byteSize(); - assert !tree.tables.get(0).isEmpty() : "table is null"; + assert !tree.levels.get(0).isEmpty() : "table is null"; tree.stop(); } diff --git a/src/test/java/com/tomfran/lsm/utils/UniqueSortedIteratorTest.java b/src/test/java/com/tomfran/lsm/utils/UniqueSortedIteratorTest.java index b7759e2..5b1c533 100644 --- a/src/test/java/com/tomfran/lsm/utils/UniqueSortedIteratorTest.java +++ b/src/test/java/com/tomfran/lsm/utils/UniqueSortedIteratorTest.java @@ -19,7 +19,6 @@ public void test() { assert res.size() == 10; for (int i = 0; i < 10; i++) assert res.get(i) == i; - } private static class DummyIterator implements Iterator {