From 4b37d334c65e12b756dcf2368cae47fc4baa5037 Mon Sep 17 00:00:00 2001 From: Fran Date: Thu, 5 Oct 2023 22:47:49 +0200 Subject: [PATCH] LSM tree first draft, flushing not working --- README.md | 32 ++-- build.gradle | 17 +- .../lsm/bloom/BloomFilterBenchmark.java | 2 +- .../lsm/memtable/SkipListBenchmark.java | 2 +- .../tomfran/lsm/sstable/SSTableBenchmark.java | 4 +- .../tomfran/lsm/tree/LSMTreeBenchmark.java | 78 ++++++++++ src/main/java/com/tomfran/lsm/Main.java | 85 ++++++++++ .../com/tomfran/lsm/bloom/BloomFilter.java | 11 ++ .../com/tomfran/lsm/io/BaseInputStream.java | 2 +- .../com/tomfran/lsm/io/BaseOutputStream.java | 2 + .../com/tomfran/lsm/memtable/Memtable.java | 17 +- .../com/tomfran/lsm/memtable/SkipList.java | 13 +- .../java/com/tomfran/lsm/sstable/SSTable.java | 33 ++-- .../java/com/tomfran/lsm/tree/LSMTree.java | 145 +++++++++++++++--- .../tomfran/lsm/sstable/SSTableMergeTest.java | 14 +- .../lsm/sstable/SSTableReconstructTest.java | 2 +- .../com/tomfran/lsm/sstable/SSTableTest.java | 2 +- .../com/tomfran/lsm/tree/LSMTreeTest.java | 52 +++++++ 18 files changed, 445 insertions(+), 68 deletions(-) create mode 100644 src/jmh/java/com/tomfran/lsm/tree/LSMTreeBenchmark.java create mode 100644 src/main/java/com/tomfran/lsm/Main.java create mode 100644 src/test/java/com/tomfran/lsm/tree/LSMTreeTest.java diff --git a/README.md b/README.md index 3cdbdf7..dc338fc 100644 --- a/README.md +++ b/README.md @@ -104,15 +104,19 @@ the operation on the node. All of them have an average time complexity of `O(log I am using [JMH](https://openjdk.java.net/projects/code-tools/jmh/) to run benchmarks, the results are obtained on a MacBook Pro (16-inch, 2021) with an M1 Pro processor and 16 GB of RAM. +To run them use `./gradlew jmh`. + ### SSTable - Negative access: the key is not present in the table, hence the Bloom filter will likely stop the search; - Random access: the key is present in the table, the order of the keys is random. ``` -Benchmark Mode Cnt Score Error Units -c.t.l.sstable.SSTableBenchmark.negativeAccess thrpt 10 3541989.316 ± 78933.780 ops/s -c.t.l.sstable.SSTableBenchmark.randomAccess thrpt 10 56157.613 ± 264.314 ops/s + +Benchmark Mode Cnt Score Error Units +c.t.l.sstable.SSTableBenchmark.negativeAccess thrpt 10 3541989.316 ± 78933.780 ops/s +c.t.l.sstable.SSTableBenchmark.randomAccess thrpt 10 56157.613 ± 264.314 ops/s + ``` ### Bloom filter @@ -121,9 +125,11 @@ c.t.l.sstable.SSTableBenchmark.randomAccess thrpt 10 56157.613 ± 26 - Contains: test whether the keys are present in the Bloom filter. ``` -Benchmark Mode Cnt Score Error Units -c.t.l.bloom.BloomFilterBenchmark.add thrpt 10 9777191.526 ± 168208.916 ops/s -c.t.l.bloom.BloomFilterBenchmark.contains thrpt 10 10724196.205 ± 20411.741 ops/s + +Benchmark Mode Cnt Score Error Units +c.t.l.bloom.BloomFilterBenchmark.add thrpt 10 9777191.526 ± 168208.916 ops/s +c.t.l.bloom.BloomFilterBenchmark.contains thrpt 10 10724196.205 ± 20411.741 ops/s + ``` ### Skip-List @@ -132,9 +138,11 @@ c.t.l.bloom.BloomFilterBenchmark.contains thrpt 10 10724196.205 ± 2041 - Add/Remove: add and remove keys from a 100k keys skip-list. ``` -Benchmark Mode Cnt Score Error Units -c.t.l.memtable.SkipListBenchmark.addRemove thrpt 10 684885.546 ± 21793.787 ops/s -c.t.l.memtable.SkipListBenchmark.get thrpt 10 823423.128 ± 83028.354 ops/s + +Benchmark Mode Cnt Score Error Units +c.t.l.memtable.SkipListBenchmark.addRemove thrpt 10 684885.546 ± 21793.787 ops/s +c.t.l.memtable.SkipListBenchmark.get thrpt 10 823423.128 ± 83028.354 ops/s + ``` --- @@ -149,14 +157,16 @@ c.t.l.memtable.SkipListBenchmark.get thrpt 10 823423.128 ± 8302 - [x] Bloom filter - [x] Indexes persistence - [x] File initialization + - [ ] Handle tombstones - [ ] Skip-List - [x] Operations - [x] Iterator - [ ] Tree - - [ ] Operations + - [x] Operations + - [ ] Background flush - [ ] Background compaction - [ ] Benchmarks - [x] SSTable - [x] Bloom filter - [x] Skip-List - - [ ] Tree \ No newline at end of file + - [ ] Tree diff --git a/build.gradle b/build.gradle index f78f215..972ca99 100644 --- a/build.gradle +++ b/build.gradle @@ -1,6 +1,7 @@ plugins { - id("java") + id "java" id "me.champeau.jmh" version "0.7.1" + id "application" } group = "com.tomfran" @@ -27,8 +28,20 @@ jmh { fork = 1 warmupIterations = 5 iterations = 10 - benchmarkMode = ['thrpt'] + benchmarkMode = ['avgt'] jmhTimeout = '15s' jmhVersion = '1.37' resultFormat = 'JSON' +} + +ext { + javaMainClass = "com.tomfran.lsm.Main" +} + +application { + mainClassName = javaMainClass +} + +run { + standardInput = System.in } \ No newline at end of file diff --git a/src/jmh/java/com/tomfran/lsm/bloom/BloomFilterBenchmark.java b/src/jmh/java/com/tomfran/lsm/bloom/BloomFilterBenchmark.java index 2a27f90..26cf861 100644 --- a/src/jmh/java/com/tomfran/lsm/bloom/BloomFilterBenchmark.java +++ b/src/jmh/java/com/tomfran/lsm/bloom/BloomFilterBenchmark.java @@ -7,7 +7,7 @@ import static com.tomfran.lsm.TestUtils.getRandomByteArray; -@OutputTimeUnit(TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.NANOSECONDS) @State(Scope.Benchmark) public class BloomFilterBenchmark { diff --git a/src/jmh/java/com/tomfran/lsm/memtable/SkipListBenchmark.java b/src/jmh/java/com/tomfran/lsm/memtable/SkipListBenchmark.java index febbe2d..f87f333 100644 --- a/src/jmh/java/com/tomfran/lsm/memtable/SkipListBenchmark.java +++ b/src/jmh/java/com/tomfran/lsm/memtable/SkipListBenchmark.java @@ -9,7 +9,7 @@ import static com.tomfran.lsm.TestUtils.getRandomPair; -@OutputTimeUnit(TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.NANOSECONDS) @State(Scope.Benchmark) public class SkipListBenchmark { diff --git a/src/jmh/java/com/tomfran/lsm/sstable/SSTableBenchmark.java b/src/jmh/java/com/tomfran/lsm/sstable/SSTableBenchmark.java index bdbd265..fa8796f 100644 --- a/src/jmh/java/com/tomfran/lsm/sstable/SSTableBenchmark.java +++ b/src/jmh/java/com/tomfran/lsm/sstable/SSTableBenchmark.java @@ -15,7 +15,7 @@ import static com.tomfran.lsm.TestUtils.getRandomPair; -@OutputTimeUnit(TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.NANOSECONDS) @State(Scope.Benchmark) public class SSTableBenchmark { @@ -59,7 +59,7 @@ public void setup() throws IOException { skipped.add(e); } - sstable = new SSTable(DIR + "/sst", inserted, SAMPLE_SIZE, inserted.size()); + sstable = new SSTable(DIR + "/sst", inserted, SAMPLE_SIZE); // shuffle to avoid sequential access Collections.shuffle(inserted); diff --git a/src/jmh/java/com/tomfran/lsm/tree/LSMTreeBenchmark.java b/src/jmh/java/com/tomfran/lsm/tree/LSMTreeBenchmark.java new file mode 100644 index 0000000..4ccd77b --- /dev/null +++ b/src/jmh/java/com/tomfran/lsm/tree/LSMTreeBenchmark.java @@ -0,0 +1,78 @@ +package com.tomfran.lsm.tree; + +import com.tomfran.lsm.types.ByteArrayPair; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.TimeUnit; + +import static com.tomfran.lsm.TestUtils.getRandomPair; + +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Benchmark) +public class LSMTreeBenchmark { + + static final Path DIR = Path.of("tree_benchmark"); + static final int NUM_ITEMS = 300000; + + static ByteArrayPair[] items; + static int index = 0; + + LSMTree tree; + + @Setup + public void setup() throws IOException { + // setup directory + if (Files.exists(DIR)) + deleteDir(); + + // generate random items + items = new ByteArrayPair[NUM_ITEMS]; + for (int i = 0; i < NUM_ITEMS; i++) + items[i] = getRandomPair(); + + // setup tree + tree = new LSMTree(1 << 15, DIR.toString()); + } + + @TearDown + public void teardown() throws IOException { + tree.stop(); + deleteDir(); + } + + private void deleteDir() throws IOException { + try (var files = Files.list(DIR)) { + files.forEach(f -> { + try { + Files.delete(f); + } catch (IOException e) { + e.printStackTrace(); + } + }); + } + Files.delete(DIR); + } + + @Benchmark + public void add() { + var item = items[index]; + tree.add(item); + + index = (index + 1) % NUM_ITEMS; + } + + @Benchmark + public void get(Blackhole bh) { + var item = items[index]; + var value = tree.get(item.key()); + + bh.consume(value); + + index = (index + 1) % NUM_ITEMS; + } + +} diff --git a/src/main/java/com/tomfran/lsm/Main.java b/src/main/java/com/tomfran/lsm/Main.java new file mode 100644 index 0000000..3a287a6 --- /dev/null +++ b/src/main/java/com/tomfran/lsm/Main.java @@ -0,0 +1,85 @@ +package com.tomfran.lsm; + +import com.tomfran.lsm.tree.LSMTree; +import com.tomfran.lsm.types.ByteArrayPair; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Scanner; + +public class Main { + + static final String DIRECTORY = "LSM-data"; + + public static void main(String[] args) throws IOException { + + if (new File(DIRECTORY).exists()) + deleteDir(); + + LSMTree tree = new LSMTree(3, DIRECTORY); + + Scanner scanner = new Scanner(System.in); + scanner.useDelimiter("\n"); + + System.out.println( + """ + LSM Tree console + + Commands: + - ins : insert a key-value pair + - get : get a value for a key + - del : delete a key-value pair + - exit : exit the application + + """ + ); + + boolean exit = false; + + while (!exit) { + System.out.print("Enter a command: "); + String command = scanner.nextLine(); + + var parts = command.split(" "); + + switch (parts[0]) { + case "exit" -> { + System.out.println("Exiting..."); + exit = true; + } + case "ins" -> tree.add(new ByteArrayPair(parts[1].getBytes(), parts[2].getBytes())); + case "del" -> tree.delete(parts[1].getBytes()); + case "get" -> { + String key = parts[1]; + byte[] value = tree.get(key.getBytes()); + + var msg = (value == null || value.length == 0) ? "No value found for key " + key : + "Value for key " + key + " is " + new String(value); + System.out.println(msg); + } + default -> System.out.println("Unknown command: " + command); + } + System.out.println(); + } + tree.stop(); + scanner.close(); + + deleteDir(); + } + + static private void deleteDir() throws IOException { + try (var files = Files.list(Path.of(DIRECTORY))) { + files.forEach(f -> { + try { + Files.delete(f); + } catch (IOException e) { + e.printStackTrace(); + } + }); + } + Files.delete(Path.of(DIRECTORY)); + } + +} diff --git a/src/main/java/com/tomfran/lsm/bloom/BloomFilter.java b/src/main/java/com/tomfran/lsm/bloom/BloomFilter.java index 6662ed2..5dd8c6d 100644 --- a/src/main/java/com/tomfran/lsm/bloom/BloomFilter.java +++ b/src/main/java/com/tomfran/lsm/bloom/BloomFilter.java @@ -22,10 +22,20 @@ */ public class BloomFilter { + static final int DEFAULT_SIZE = 1 << 20; + final int size; final int hashCount; final long[] bits; + /** + * Create a new Bloom filter with the default size and a false positive rate of 0.1%. + */ + public BloomFilter() { + this(DEFAULT_SIZE, 0.001); + } + + /** * Create a new Bloom filter with the given expected insertions and a false positive rate of 0.1%. * @@ -35,6 +45,7 @@ public BloomFilter(int expectedInsertions) { this(expectedInsertions, 0.001); } + /** * Create a new Bloom filter with the given expected insertions and false positive rate. * diff --git a/src/main/java/com/tomfran/lsm/io/BaseInputStream.java b/src/main/java/com/tomfran/lsm/io/BaseInputStream.java index 746c666..8d1bb00 100644 --- a/src/main/java/com/tomfran/lsm/io/BaseInputStream.java +++ b/src/main/java/com/tomfran/lsm/io/BaseInputStream.java @@ -36,7 +36,7 @@ public long readVByteLong() { shift += 7; } - return result; + return result - 1; } public long readLong() { diff --git a/src/main/java/com/tomfran/lsm/io/BaseOutputStream.java b/src/main/java/com/tomfran/lsm/io/BaseOutputStream.java index c535205..7bf4e8c 100644 --- a/src/main/java/com/tomfran/lsm/io/BaseOutputStream.java +++ b/src/main/java/com/tomfran/lsm/io/BaseOutputStream.java @@ -59,6 +59,8 @@ byte[] intToVByte(int n) { } private byte[] longToVByte(long n) { + n++; + if (n <= 0) { throw new IllegalArgumentException("n must be greater than 0"); } diff --git a/src/main/java/com/tomfran/lsm/memtable/Memtable.java b/src/main/java/com/tomfran/lsm/memtable/Memtable.java index 4441579..f25b9f8 100644 --- a/src/main/java/com/tomfran/lsm/memtable/Memtable.java +++ b/src/main/java/com/tomfran/lsm/memtable/Memtable.java @@ -3,10 +3,10 @@ import com.tomfran.lsm.sstable.SSTable; import com.tomfran.lsm.types.ByteArrayPair; -public class Memtable { - - static final int DEFAULT_SSTABLE_SAMPLE_SIZE = 1 << 10; +import java.util.Iterator; +public class Memtable implements Iterable { + SkipList list; public Memtable() { @@ -26,15 +26,20 @@ public byte[] get(byte[] key) { } public void remove(byte[] key) { - list.add(new ByteArrayPair(key, null)); + list.add(new ByteArrayPair(key, new byte[]{})); } public int size() { return list.size(); } - public SSTable flush(String filename) { - return new SSTable(filename, list, DEFAULT_SSTABLE_SAMPLE_SIZE, list.size()); + public SSTable flush(String filename, int sampleSize) { + return new SSTable(filename, list, sampleSize); + } + + @Override + public Iterator iterator() { + return list.iterator(); } } diff --git a/src/main/java/com/tomfran/lsm/memtable/SkipList.java b/src/main/java/com/tomfran/lsm/memtable/SkipList.java index d2a0387..fe2fc6e 100644 --- a/src/main/java/com/tomfran/lsm/memtable/SkipList.java +++ b/src/main/java/com/tomfran/lsm/memtable/SkipList.java @@ -167,7 +167,13 @@ private static final class Node { } - private record SkipListIterator(Node node) implements Iterator { + private static class SkipListIterator implements Iterator { + + Node node; + + SkipListIterator(Node node) { + this.node = node; + } @Override public boolean hasNext() { @@ -176,7 +182,10 @@ public boolean hasNext() { @Override public ByteArrayPair next() { - return node.next[0].val; + var res = node.next[0].val; + node = node.next[0]; + + return res; } } diff --git a/src/main/java/com/tomfran/lsm/sstable/SSTable.java b/src/main/java/com/tomfran/lsm/sstable/SSTable.java index ce75436..b517400 100644 --- a/src/main/java/com/tomfran/lsm/sstable/SSTable.java +++ b/src/main/java/com/tomfran/lsm/sstable/SSTable.java @@ -9,11 +9,13 @@ import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import java.io.File; import java.util.Iterator; import static com.tomfran.lsm.comparator.ByteArrayComparator.compare; +import static java.util.Arrays.stream; -public class SSTable { +public class SSTable implements Iterable { public static final String DATA_FILE_EXTENSION = ".data"; public static final String BLOOM_FILE_EXTENSION = ".bloom"; @@ -36,9 +38,9 @@ public class SSTable { * @param sampleSize The number of items to skip between sparse index entries. * @param numItems The number of items in the SSTable. */ - public SSTable(String filename, Iterable items, int sampleSize, int numItems) { + public SSTable(String filename, Iterable items, int sampleSize) { this.filename = filename; - writeItems(filename, items, sampleSize, numItems); + writeItems(filename, items, sampleSize); is = new BaseInputStream(filename + DATA_FILE_EXTENSION); } @@ -60,18 +62,12 @@ public SSTable(String filename) { * @param tables The SSTables to merge. * @return The merged SSTable. */ - static SSTable merge(String filename, int sampleSize, SSTable... tables) { + public static SSTable merge(String filename, int sampleSize, Iterable... tables) { - int newSize = 0; - Iterator[] iterators = new Iterator[tables.length]; - for (int i = 0; i < tables.length; i++) { - iterators[i] = tables[i].iterator(); - newSize += tables[i].size; - } - - SSTableMergerIterator it = new SSTableMergerIterator(iterators); + SSTableMergerIterator it = new SSTableMergerIterator(stream(tables).map(Iterable::iterator) + .toArray(Iterator[]::new)); - return new SSTable(filename, it, sampleSize, newSize); + return new SSTable(filename, it, sampleSize); } private void initializeFromDisk(String filename) { @@ -197,13 +193,13 @@ else if (cmp > 0) return low; } - private void writeItems(String filename, Iterable items, int sampleSize, int numItems) { + private void writeItems(String filename, Iterable items, int sampleSize) { BaseOutputStream ios = new BaseOutputStream(filename + DATA_FILE_EXTENSION); sparseOffsets = new LongArrayList(); sparseSizeCount = new IntArrayList(); sparseKeys = new ObjectArrayList<>(); - bloomFilter = new BloomFilter(numItems); + bloomFilter = new BloomFilter(); // write items and populate indexes int size = 0; @@ -253,6 +249,12 @@ private void writeItems(String filename, Iterable items, int samp indexOs.close(); } + public void deleteFiles() { + new File(filename + DATA_FILE_EXTENSION).delete(); + new File(filename + INDEX_FILE_EXTENSION).delete(); + new File(filename + BLOOM_FILE_EXTENSION).delete(); + } + private static class SSTableIterator implements Iterator { private final SSTable table; @@ -271,6 +273,7 @@ public boolean hasNext() { @Override public ByteArrayPair next() { remaining--; + return table.is.readBytePair(); } diff --git a/src/main/java/com/tomfran/lsm/tree/LSMTree.java b/src/main/java/com/tomfran/lsm/tree/LSMTree.java index 660d938..76561da 100644 --- a/src/main/java/com/tomfran/lsm/tree/LSMTree.java +++ b/src/main/java/com/tomfran/lsm/tree/LSMTree.java @@ -4,54 +4,159 @@ import com.tomfran.lsm.sstable.SSTable; import com.tomfran.lsm.types.ByteArrayPair; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.LinkedList; +import java.util.concurrent.ExecutorService; + +import static java.util.concurrent.Executors.newSingleThreadExecutor; public class LSMTree { - static final int MEMTABLE_MAX_SIZE = 1 << 16; + static final int DEFAULT_MEMTABLE_MAX_SIZE = 1 << 15; + static final int DEFAULT_SSTABLE_SAMPLE_SIZE = 1 << 10; + static final String DEFAULT_DATA_DIRECTORY = "LSM-data"; + + final Object mutableMemtableLock = new Object(); + final Object immutableMemtablesLock = new Object(); + final Object tableLock = new Object(); Memtable mutableMemtable; + final int mutableMemtableMaxSize; + final String dataDir; + LinkedList immutableMemtables; - LinkedList tables; + SSTable table; + ExecutorService memtableFlusher; + /** + * Creates a new LSMTree with a default memtable size and data directory. + */ public LSMTree() { - mutableMemtable = new Memtable(MEMTABLE_MAX_SIZE); + this(DEFAULT_MEMTABLE_MAX_SIZE, DEFAULT_DATA_DIRECTORY); + } + + /** + * Creates a new LSMTree with a memtable size and data directory. + * + * @param memtableMaxSize The maximum size of the memtable before it is flushed to disk. + * @param dataDir The directory to store the data in. + */ + public LSMTree(int memtableMaxSize, String dataDir) { + mutableMemtableMaxSize = memtableMaxSize; + this.dataDir = dataDir; + createDataDir(); + + mutableMemtable = new Memtable(memtableMaxSize); immutableMemtables = new LinkedList<>(); - tables = new LinkedList<>(); + memtableFlusher = newSingleThreadExecutor(); } + + /** + * Adds an item to the LSMTree. + * If the memtable is full, it is flushed to disk. + * + * @param item The item to add. + */ public void add(ByteArrayPair item) { - mutableMemtable.add(item); - checkMemtableSize(); + synchronized (mutableMemtableLock) { + mutableMemtable.add(item); + checkMemtableSize(); + } } + /** + * Removes an item from the LSMTree. + * This is done by adding a tombstone to the memtable. + * + * @param key The key of the item to remove. + */ public void delete(byte[] key) { - mutableMemtable.remove(key); - checkMemtableSize(); - } - - private void checkMemtableSize() { - if (mutableMemtable.size() >= MEMTABLE_MAX_SIZE) { - immutableMemtables.add(mutableMemtable); - mutableMemtable = new Memtable(MEMTABLE_MAX_SIZE); + synchronized (mutableMemtableLock) { + mutableMemtable.remove(key); + checkMemtableSize(); } } + /** + * Gets an item from the LSMTree. + * + * @param key The key of the item to get. + * @return The value of the item, or null if it does not exist. + */ public byte[] get(byte[] key) { byte[] result; - if ((result = mutableMemtable.get(key)) != null) - return result; - - for (Memtable memtable : immutableMemtables) - if ((result = memtable.get(key)) != null) + synchronized (mutableMemtableLock) { + if ((result = mutableMemtable.get(key)) != null) return result; + } - for (SSTable table : tables) + synchronized (immutableMemtablesLock) { + for (Memtable memtable : immutableMemtables) + if ((result = memtable.get(key)) != null) + return result; + } + + synchronized (tableLock) { if ((result = table.get(key)) != null) return result; + } return null; } + public void stop() { + memtableFlusher.shutdownNow(); + } + + private void checkMemtableSize() { + if (mutableMemtable.size() <= mutableMemtableMaxSize) + return; + + synchronized (immutableMemtablesLock) { + immutableMemtables.addFirst(mutableMemtable); + mutableMemtable = new Memtable(mutableMemtableMaxSize); + memtableFlusher.execute(this::flushLastMemtable); + } + } + + private void flushLastMemtable() { + Memtable memtableToFlush; + + // extract immutable memtable which need to be flushed + synchronized (immutableMemtablesLock) { + if (immutableMemtables.isEmpty()) + return; + + memtableToFlush = immutableMemtables.getLast(); + } + + String filename = String.format("%s/sst_%d", dataDir, System.currentTimeMillis()); + + synchronized (tableLock) { + if (table == null) + table = mutableMemtable.flush(filename, DEFAULT_SSTABLE_SAMPLE_SIZE); + else { + var newTable = SSTable.merge(filename, DEFAULT_SSTABLE_SAMPLE_SIZE, memtableToFlush, table); + table.deleteFiles(); + table = newTable; + } + } + + // remove flushed memtable from immutable memtables + synchronized (immutableMemtablesLock) { + immutableMemtables.removeLast(); + } + } + + private void createDataDir() { + try { + Files.createDirectory(Path.of(dataDir)); + } catch (Exception e) { + throw new RuntimeException("Could not create data directory", e); + } + } + } diff --git a/src/test/java/com/tomfran/lsm/sstable/SSTableMergeTest.java b/src/test/java/com/tomfran/lsm/sstable/SSTableMergeTest.java index 745ca0b..99c1320 100644 --- a/src/test/java/com/tomfran/lsm/sstable/SSTableMergeTest.java +++ b/src/test/java/com/tomfran/lsm/sstable/SSTableMergeTest.java @@ -1,6 +1,7 @@ package com.tomfran.lsm.sstable; import com.tomfran.lsm.comparator.ByteArrayComparator; +import com.tomfran.lsm.memtable.Memtable; import com.tomfran.lsm.types.ByteArrayPair; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -15,10 +16,11 @@ public class SSTableMergeTest { - static final String MERGE_FILE = "/merge", TABLE_1_FILE = "/test1", TABLE_2_FILE = "/test2"; + static final String MERGE_FILE = "/merge", TABLE_FILE = "/test"; @TempDir static Path tempDirectory; - static SSTable merge, first, second; + static Memtable first; + static SSTable merge, second; static List firstItems, secondItems, expectedItems; @BeforeAll @@ -30,6 +32,7 @@ public static void setup() { // generate overlapping items int n = 10; + firstItems = generatePairList(0, n, false); secondItems = generatePairList(n - n / 2, n * 2, true); @@ -37,8 +40,10 @@ public static void setup() { expectedItems.addAll(firstItems); secondItems.stream().skip(n / 2).forEach(expectedItems::add); - first = new SSTable(tempDirectory + TABLE_1_FILE, firstItems, 100, firstItems.size()); - second = new SSTable(tempDirectory + TABLE_2_FILE, secondItems, 100, secondItems.size()); + + first = new Memtable(); + firstItems.forEach(first::add); + second = new SSTable(tempDirectory + TABLE_FILE, secondItems, 100); merge = SSTable.merge(tempDirectory + MERGE_FILE, 100, first, second); } @@ -46,7 +51,6 @@ public static void setup() { @AfterAll public static void teardown() { merge.close(); - first.close(); second.close(); } diff --git a/src/test/java/com/tomfran/lsm/sstable/SSTableReconstructTest.java b/src/test/java/com/tomfran/lsm/sstable/SSTableReconstructTest.java index b1acbd0..cd56310 100644 --- a/src/test/java/com/tomfran/lsm/sstable/SSTableReconstructTest.java +++ b/src/test/java/com/tomfran/lsm/sstable/SSTableReconstructTest.java @@ -34,7 +34,7 @@ static void setup() throws IOException { .sorted((a, b) -> compare(a.key(), b.key())) .toList(); - t1 = new SSTable(tempDirectory + FILE1, items, 3, items.size()); + t1 = new SSTable(tempDirectory + FILE1, items, 3); for (var end : List.of(INDEX_FILE_EXTENSION, DATA_FILE_EXTENSION, BLOOM_FILE_EXTENSION)) Files.copy(Path.of(tempDirectory + FILE1 + end), Path.of(tempDirectory + FILE2 + end)); diff --git a/src/test/java/com/tomfran/lsm/sstable/SSTableTest.java b/src/test/java/com/tomfran/lsm/sstable/SSTableTest.java index 4fdfc39..497db22 100644 --- a/src/test/java/com/tomfran/lsm/sstable/SSTableTest.java +++ b/src/test/java/com/tomfran/lsm/sstable/SSTableTest.java @@ -51,7 +51,7 @@ public static void setup() { skipped.add(e); } - t = new SSTable(tempDirectory + TEST_FILE, inserted, SAMPLE_SIZE, inserted.size()); + t = new SSTable(tempDirectory + TEST_FILE, inserted, SAMPLE_SIZE); } @AfterAll diff --git a/src/test/java/com/tomfran/lsm/tree/LSMTreeTest.java b/src/test/java/com/tomfran/lsm/tree/LSMTreeTest.java new file mode 100644 index 0000000..1402b29 --- /dev/null +++ b/src/test/java/com/tomfran/lsm/tree/LSMTreeTest.java @@ -0,0 +1,52 @@ +package com.tomfran.lsm.tree; + +import it.unimi.dsi.fastutil.objects.Object2ObjectArrayMap; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.stream.IntStream; + +import static com.tomfran.lsm.TestUtils.getRandomPair; +import static com.tomfran.lsm.comparator.ByteArrayComparator.compare; + +class LSMTreeTest { + + @TempDir + static Path tempDirectory; + + @Test + public void writeFlush() throws InterruptedException { + int maxSize = 10; + + LSMTree tree = new LSMTree(maxSize, tempDirectory + "/test1"); + + IntStream.range(0, maxSize + 2).forEach(i -> tree.add(getRandomPair())); + + Thread.sleep(2000); + + assert tree.mutableMemtable.size() == 1 : "mutable memtable size is " + tree.mutableMemtable.size(); + assert tree.table != null : "table is null"; + } + + @Test + public void writeFlow() throws InterruptedException { + int maxSize = 10; + + LSMTree tree = new LSMTree(maxSize, tempDirectory + "/test2"); + + Object2ObjectArrayMap items = new Object2ObjectArrayMap<>(); + + IntStream.range(0, 5 * maxSize).forEach(i -> { + var it = getRandomPair(); + tree.add(it); + items.put(it.key(), it.value()); + }); + + Thread.sleep(5000); + + for (var it : items.entrySet()) + assert compare(tree.get(it.getKey()), it.getValue()) == 0; + } + +} \ No newline at end of file