Skip to content

Commit

Permalink
Fixed LSM tree not flushing, cleanup SSTable iterators, added toy con…
Browse files Browse the repository at this point in the history
…sole
  • Loading branch information
tomfran committed Oct 8, 2023
1 parent 4b37d33 commit ff6271a
Show file tree
Hide file tree
Showing 16 changed files with 189 additions and 72 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ c.t.l.memtable.SkipListBenchmark.get thrpt 10 823423.128 ± 83028.354 ops/s
- [x] Iterator
- [ ] Tree
- [x] Operations
- [ ] Background flush
- [x] Background flush
- [ ] Background compaction
- [ ] Benchmarks
- [x] SSTable
Expand Down
2 changes: 1 addition & 1 deletion src/jmh/java/com/tomfran/lsm/tree/LSMTreeBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void get(Blackhole bh) {
var value = tree.get(item.key());

bh.consume(value);

index = (index + 1) % NUM_ITEMS;
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/tomfran/lsm/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ public static void main(String[] args) throws IOException {
System.out.println(
"""
LSM Tree console
Commands:
- ins <key> <value> : insert a key-value pair
- get <key> : get a value for a key
- del <key> : delete a key-value pair
- exit : exit the application
"""
);

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/tomfran/lsm/io/BaseOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ byte[] intToVByte(int n) {

private byte[] longToVByte(long n) {
n++;

if (n <= 0) {
throw new IllegalArgumentException("n must be greater than 0");
}
Expand Down
7 changes: 1 addition & 6 deletions src/main/java/com/tomfran/lsm/memtable/Memtable.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.tomfran.lsm.memtable;

import com.tomfran.lsm.sstable.SSTable;
import com.tomfran.lsm.types.ByteArrayPair;

import java.util.Iterator;

public class Memtable implements Iterable<ByteArrayPair> {

SkipList list;

public Memtable() {
Expand All @@ -33,10 +32,6 @@ public int size() {
return list.size();
}

public SSTable flush(String filename, int sampleSize) {
return new SSTable(filename, list, sampleSize);
}

@Override
public Iterator<ByteArrayPair> iterator() {
return list.iterator();
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/com/tomfran/lsm/memtable/SkipList.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ public boolean hasNext() {

@Override
public ByteArrayPair next() {
var res = node.next[0].val;
if (node.next[0] == null)
return null;

ByteArrayPair res = node.next[0].val;
node = node.next[0];

return res;
Expand Down
62 changes: 14 additions & 48 deletions src/main/java/com/tomfran/lsm/sstable/SSTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.tomfran.lsm.io.BaseOutputStream;
import com.tomfran.lsm.types.ByteArrayPair;
import com.tomfran.lsm.utils.IteratorMerger;
import com.tomfran.lsm.utils.UniqueSortedIterator;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
Expand Down Expand Up @@ -36,9 +37,8 @@ public class SSTable implements Iterable<ByteArrayPair> {
* @param filename The filename to write the SSTable to.
* @param items The items to write to the SSTable, assumed to be sorted.
* @param sampleSize The number of items to skip between sparse index entries.
* @param numItems The number of items in the SSTable.
*/
public SSTable(String filename, Iterable<ByteArrayPair> items, int sampleSize) {
public SSTable(String filename, Iterator<ByteArrayPair> items, int sampleSize) {
this.filename = filename;
writeItems(filename, items, sampleSize);
is = new BaseInputStream(filename + DATA_FILE_EXTENSION);
Expand All @@ -63,11 +63,13 @@ public SSTable(String filename) {
* @return The merged SSTable.
*/
public static SSTable merge(String filename, int sampleSize, Iterable<ByteArrayPair>... tables) {
Iterator<ByteArrayPair>[] itArray = stream(tables).map(Iterable::iterator)
.toArray(Iterator[]::new);

SSTableMergerIterator it = new SSTableMergerIterator(stream(tables).map(Iterable::iterator)
.toArray(Iterator[]::new));
IteratorMerger<ByteArrayPair> merger = new IteratorMerger<>(itArray);
UniqueSortedIterator<ByteArrayPair> uniqueSortedIterator = new UniqueSortedIterator<>(merger);

return new SSTable(filename, it, sampleSize);
return new SSTable(filename, uniqueSortedIterator, sampleSize);
}

private void initializeFromDisk(String filename) {
Expand Down Expand Up @@ -193,7 +195,7 @@ else if (cmp > 0)
return low;
}

private void writeItems(String filename, Iterable<ByteArrayPair> items, int sampleSize) {
private void writeItems(String filename, Iterator<ByteArrayPair> items, int sampleSize) {
BaseOutputStream ios = new BaseOutputStream(filename + DATA_FILE_EXTENSION);

sparseOffsets = new LongArrayList();
Expand All @@ -204,7 +206,8 @@ private void writeItems(String filename, Iterable<ByteArrayPair> items, int samp
// write items and populate indexes
int size = 0;
long offset = 0L;
for (ByteArrayPair item : items) {
while (items.hasNext()) {
ByteArrayPair item = items.next();
if (size % sampleSize == 0) {
sparseOffsets.add(offset);
sparseSizeCount.add(size);
Expand All @@ -217,6 +220,10 @@ private void writeItems(String filename, Iterable<ByteArrayPair> items, int samp
}
ios.close();

if (size == 0) {
throw new IllegalArgumentException("Attempted to create an SSTable from an empty iterator");
}

this.size = size;

// write bloom filter and index to disk
Expand Down Expand Up @@ -279,45 +286,4 @@ public ByteArrayPair next() {

}

/**
* SSTableMergerIterator is an IteratorMerger that merges SSTables.
* <p>
* When merging SSTables, we want to skip over duplicate keys. This is done by
* keeping track of the last key we saw, and skipping over any keys that are
* equal to the last key.
*/
private static class SSTableMergerIterator extends IteratorMerger<ByteArrayPair> implements Iterable<ByteArrayPair> {

private ByteArrayPair last;

@SafeVarargs
public SSTableMergerIterator(Iterator<ByteArrayPair>... iterators) {
super(iterators);
last = super.next();
}

@Override
public boolean hasNext() {
return last != null;
}

@Override
public ByteArrayPair next() {
ByteArrayPair next = super.next();
while (next != null && last.compareTo(next) == 0)
next = super.next();

ByteArrayPair toReturn = last;
last = next;

return toReturn;
}

@Override
public Iterator<ByteArrayPair> iterator() {
return this;
}

}

}
6 changes: 3 additions & 3 deletions src/main/java/com/tomfran/lsm/tree/LSMTree.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ public class LSMTree {
final Object immutableMemtablesLock = new Object();
final Object tableLock = new Object();

Memtable mutableMemtable;
final int mutableMemtableMaxSize;
final String dataDir;

Memtable mutableMemtable;
LinkedList<Memtable> immutableMemtables;
SSTable table;
ExecutorService memtableFlusher;
Expand Down Expand Up @@ -137,9 +137,9 @@ private void flushLastMemtable() {

synchronized (tableLock) {
if (table == null)
table = mutableMemtable.flush(filename, DEFAULT_SSTABLE_SAMPLE_SIZE);
table = new SSTable(filename, memtableToFlush.iterator(), DEFAULT_SSTABLE_SAMPLE_SIZE);
else {
var newTable = SSTable.merge(filename, DEFAULT_SSTABLE_SAMPLE_SIZE, memtableToFlush, table);
SSTable newTable = SSTable.merge(filename, DEFAULT_SSTABLE_SAMPLE_SIZE, memtableToFlush, table);
table.deleteFiles();
table = newTable;
}
Expand Down
39 changes: 39 additions & 0 deletions src/main/java/com/tomfran/lsm/utils/UniqueSortedIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.tomfran.lsm.utils;

import java.util.Iterator;

/**
* Skip duplicates in a sorted iterator by keeping only the first one.
* <p>
* Reads after the last element of the last Iterator will return null.
*
* @param <T> The type of the elements in the Iterators.
*/
public class UniqueSortedIterator<T extends Comparable<T>> implements Iterator<T> {

Iterator<T> iterator;
private T last;

public UniqueSortedIterator(Iterator<T> iterator) {
this.iterator = iterator;
last = iterator.next();
}

@Override
public boolean hasNext() {
return last != null;
}

@Override
public T next() {
T next = iterator.next();
while (next != null && last.compareTo(next) == 0)
next = iterator.next();

T toReturn = last;
last = next;

return toReturn;
}

}
53 changes: 53 additions & 0 deletions src/test/java/com/tomfran/lsm/memtable/SkipListIteratorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.tomfran.lsm.memtable;

import com.tomfran.lsm.comparator.ByteArrayComparator;
import com.tomfran.lsm.types.ByteArrayPair;
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.List;

import static com.tomfran.lsm.TestUtils.getRandomPair;
import static com.tomfran.lsm.comparator.ByteArrayComparator.compare;

public class SkipListIteratorTest {

static int NUM_ITEMS = 10;
static SkipList list;
static List<ByteArrayPair> items;

@BeforeAll
static void setup() {

list = new SkipList(NUM_ITEMS);

// generate random items
var l = new ObjectOpenHashSet<ByteArrayPair>();
for (int i = 0; i < NUM_ITEMS; i++) {
l.add(getRandomPair());
}

items = l.stream()
.sorted((a, b) -> ByteArrayComparator.compare(a.key(), b.key()))
.toList();

items.forEach(list::add);
}

@Test
public void iteratorTest() {
var it = list.iterator();
var it2 = items.iterator();

while (it.hasNext()) {
var a = it.next();
var b = it2.next();
assert compare(a.key(), b.key()) == 0;
assert compare(a.value(), b.value()) == 0;
}

assert !it2.hasNext();
}

}
11 changes: 11 additions & 0 deletions src/test/java/com/tomfran/lsm/memtable/SkipListTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ public void shouldFind() {

@Test
@Order(2)
public void iteratorTest() {
var it = l.iterator();
while (it.hasNext()) {
it.next();
}
assert it.next() == null;
}

@Test
@Order(3)
public void shouldRemove() {
for (ByteArrayPair item : items.subList(0, 50)) {
l.remove(item.key());
Expand All @@ -60,4 +70,5 @@ public void shouldRemove() {
}
}


}
8 changes: 3 additions & 5 deletions src/test/java/com/tomfran/lsm/sstable/SSTableMergeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

public class SSTableMergeTest {

static final String MERGE_FILE = "/merge", TABLE_FILE = "/test";
static final String MERGE_FILE = "/merge", TABLE_FILE2 = "/test2";
@TempDir
static Path tempDirectory;
static Memtable first;
static SSTable merge, second;
static List<ByteArrayPair> firstItems, secondItems, expectedItems;

Expand All @@ -40,10 +39,9 @@ public static void setup() {
expectedItems.addAll(firstItems);
secondItems.stream().skip(n / 2).forEach(expectedItems::add);


first = new Memtable();
var first = new Memtable();
firstItems.forEach(first::add);
second = new SSTable(tempDirectory + TABLE_FILE, secondItems, 100);
second = new SSTable(tempDirectory + TABLE_FILE2, secondItems.iterator(), 100);

merge = SSTable.merge(tempDirectory + MERGE_FILE, 100, first, second);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ static void setup() throws IOException {
.sorted((a, b) -> compare(a.key(), b.key()))
.toList();

t1 = new SSTable(tempDirectory + FILE1, items, 3);
t1 = new SSTable(tempDirectory + FILE1, items.iterator(), 3);

for (var end : List.of(INDEX_FILE_EXTENSION, DATA_FILE_EXTENSION, BLOOM_FILE_EXTENSION))
Files.copy(Path.of(tempDirectory + FILE1 + end), Path.of(tempDirectory + FILE2 + end));
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/com/tomfran/lsm/sstable/SSTableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static void setup() {
skipped.add(e);
}

t = new SSTable(tempDirectory + TEST_FILE, inserted, SAMPLE_SIZE);
t = new SSTable(tempDirectory + TEST_FILE, inserted.iterator(), SAMPLE_SIZE);
}

@AfterAll
Expand Down
5 changes: 3 additions & 2 deletions src/test/java/com/tomfran/lsm/tree/LSMTreeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ public void writeFlow() throws InterruptedException {

Object2ObjectArrayMap<byte[], byte[]> items = new Object2ObjectArrayMap<>();

IntStream.range(0, 5 * maxSize).forEach(i -> {
IntStream.range(0, 2 * maxSize).forEach(i -> {
var it = getRandomPair();
tree.add(it);
items.put(it.key(), it.value());
});

Thread.sleep(5000);

Thread.sleep(2000);

for (var it : items.entrySet())
assert compare(tree.get(it.getKey()), it.getValue()) == 0;
Expand Down
Loading

0 comments on commit ff6271a

Please sign in to comment.