Skip to content

Commit

Permalink
WIP - To get some early feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jaykorean committed Dec 15, 2023
1 parent c96d9a0 commit ae682f2
Show file tree
Hide file tree
Showing 13 changed files with 412 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ set(SOURCES
db/memtable_list.cc
db/merge_helper.cc
db/merge_operator.cc
db/multi_cf_iterator.cc
db/output_validator.cc
db/periodic_task_scheduler.cc
db/range_del_aggregator.cc
Expand Down Expand Up @@ -1348,6 +1349,7 @@ if(WITH_TESTS)
db/memtable_list_test.cc
db/merge_helper_test.cc
db/merge_test.cc
db/multi_cf_iterator_test.cc
db/options_file_test.cc
db/perf_context_test.cc
db/periodic_task_scheduler_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1639,6 +1639,9 @@ wal_edit_test: $(OBJ_DIR)/db/wal_edit_test.o $(TEST_LIBRARY) $(LIBRARY)
dbformat_test: $(OBJ_DIR)/db/dbformat_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

multi_cf_iterator_test: $(OBJ_DIR)/db/multi_cf_iterator_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

env_basic_test: $(OBJ_DIR)/env/env_basic_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
7 changes: 7 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/memtable_list.cc",
"db/merge_helper.cc",
"db/merge_operator.cc",
"db/multi_cf_iterator.cc",
"db/output_validator.cc",
"db/periodic_task_scheduler.cc",
"db/range_del_aggregator.cc",
Expand Down Expand Up @@ -5226,6 +5227,12 @@ cpp_unittest_wrapper(name="mock_env_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="multi_cf_iterator_test",
srcs=["db/multi_cf_iterator_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="object_registry_test",
srcs=["utilities/object_registry_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
21 changes: 21 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/multi_cf_iterator.h"
#include "rocksdb/statistics.h"
#include "rocksdb/stats_history.h"
#include "rocksdb/status.h"
Expand Down Expand Up @@ -3934,6 +3935,26 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(
return db_iter;
}

MultiCfIterator* DBImpl::NewMultiCfIterator(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families) {
assert(column_families.size() > 0);

// Use the key comparator from the first CF
auto first_cfh =
static_cast_with_check<ColumnFamilyHandleImpl>(column_families[0]);
ColumnFamilyData* first_cfd = first_cfh->cfd();

std::vector<Iterator*> child_iterators;
Status s = NewIterators(_read_options, column_families, &child_iterators);
if (s.ok()) {
return NewMultiColumnFamilyIterator(first_cfd->user_comparator(),
column_families, child_iterators);
}
// TODO - return something like NewErrorIterator?
return nullptr;
}

Status DBImpl::NewIterators(
const ReadOptions& _read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
Expand Down
8 changes: 8 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,14 @@ class DBImpl : public DB {
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) override;

// DO NOT USE, UNDER CONSTRUCTION
// Returns a cross-column-family iterator from a consistent database state.
// When the same key exists in more than one column families, this iterates in
// the order that column family is provided in column_families
virtual MultiCfIterator* NewMultiCfIterator(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) override;

virtual const Snapshot* GetSnapshot() override;
virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
// Create a timestamped snapshot. This snapshot can be shared by multiple
Expand Down
8 changes: 8 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3197,6 +3197,14 @@ class ModelDB : public DB {
std::vector<Iterator*>* /*iterators*/) override {
return Status::NotSupported("Not supported yet");
}

// DO NOT USE, UNDER CONSTRUCTION
MultiCfIterator* NewMultiCfIterator(
const ReadOptions& /*options*/,
const std::vector<ColumnFamilyHandle*>& /*column_families*/) override {
return nullptr;
}

const Snapshot* GetSnapshot() override {
ModelSnapshot* snapshot = new ModelSnapshot;
snapshot->map_ = map_;
Expand Down
41 changes: 41 additions & 0 deletions db/multi_cf_iterator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "rocksdb/multi_cf_iterator.h"

namespace ROCKSDB_NAMESPACE {

MultiCfIterator::MultiCfIterator(
const Comparator* comparator,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators)
: comparator_(comparator) {
assert(column_families.size() > 0);
assert(column_families.size() == child_iterators.size());

cfhs_.reserve(column_families.size());
iterators_.reserve(column_families.size());
for (size_t i = 0; i < column_families.size(); ++i) {
cfhs_.push_back(column_families[i]);
iterators_.push_back(child_iterators[i]);
}
}
MultiCfIterator::~MultiCfIterator() {
for (auto iter : iterators_) {
delete iter;
}
status_.PermitUncheckedError();
}

MultiCfIterator* NewMultiColumnFamilyIterator(
const Comparator* comparator,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<Iterator*>& child_iterators) {
MultiCfIterator* iterator =
new MultiCfIterator(comparator, column_families, child_iterators);
return iterator;
}

} // namespace ROCKSDB_NAMESPACE
202 changes: 202 additions & 0 deletions db/multi_cf_iterator_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "db/db_test_util.h"

namespace ROCKSDB_NAMESPACE {

class MultiCfIteratorTest : public DBTestBase {
public:
MultiCfIteratorTest()
: DBTestBase("multi_cf_iterator_test", /*env_do_fsync=*/true) {}
};

TEST_F(MultiCfIteratorTest, SimpleValues) {
Options options = GetDefaultOptions();
auto verify = [&](const std::vector<ColumnFamilyHandle*>& cfhs,
const std::vector<Slice>& expected_keys,
const std::vector<Slice>& expected_values) {
int i = 0;
MultiCfIterator* iter = db_->NewMultiCfIterator(ReadOptions(), cfhs);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(expected_keys[i], iter->key());
ASSERT_EQ(expected_values[i], iter->value());
++i;
}
delete iter;
// TODO - Check when implementation is done
// ASSERT_EQ(expected_keys.size(), i);
};

{
// Case 1: Unique key per CF
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);

ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val"));
ASSERT_OK(Put(1, "key_2", "key_2_cf_1_val"));
ASSERT_OK(Put(2, "key_3", "key_3_cf_2_val"));
ASSERT_OK(Put(3, "key_4", "key_4_cf_3_val"));

std::vector<Slice> expected_keys = {"key_1", "key_2", "key_3", "key_4"};
std::vector<Slice> expected_values = {"key_1_cf_0_val", "key_2_cf_1_val",
"key_3_cf_2_val", "key_4_cf_3_val"};

// Test for iteration over CF default->1->2->3
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
handles_[0], handles_[1], handles_[2], handles_[3]};
verify(cfhs_order_0_1_2_3, expected_keys, expected_values);

// Test for iteration over CF 3->1->default_cf->2
std::vector<ColumnFamilyHandle*> cfhs_order_3_1_0_2 = {
handles_[3], handles_[1], handles_[0], handles_[2]};
// Iteration order and the return values should be the same since keys are
// unique per CF
verify(cfhs_order_3_1_0_2, expected_keys, expected_values);
}
{
// Case 2: Same key in multiple CFs
options = CurrentOptions(options);
DestroyAndReopen(options);
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);

ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val"));
ASSERT_OK(Put(3, "key_1", "key_1_cf_3_val"));
ASSERT_OK(Put(1, "key_2", "key_2_cf_1_val"));
ASSERT_OK(Put(2, "key_2", "key_2_cf_2_val"));
ASSERT_OK(Put(0, "key_3", "key_3_cf_0_val"));
ASSERT_OK(Put(1, "key_3", "key_3_cf_1_val"));
ASSERT_OK(Put(3, "key_3", "key_3_cf_3_val"));

std::vector<Slice> expected_keys = {"key_1", "key_2", "key_3"};

// Test for iteration over CF default->1->2->3
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
handles_[0], handles_[1], handles_[2], handles_[3]};
// Pick what DBIter would return for value() in the first CF that key exists
// Since value for kDefaultWideColumnName only exists for key_1, rest will
// return empty value
std::vector<Slice> expected_values = {"key_1_cf_0_val", "key_2_cf_1_val",
"key_3_cf_0_val"};
verify(cfhs_order_0_1_2_3, expected_keys, expected_values);

// Test for iteration over CF 3->2->default_cf->1
std::vector<ColumnFamilyHandle*> cfhs_order_3_2_0_1 = {
handles_[3], handles_[2], handles_[0], handles_[1]};
expected_values = {"key_1_cf_3_val", "key_2_cf_2_val", "key_3_cf_3_val"};
verify(cfhs_order_3_2_0_1, expected_keys, expected_values);
}
}

TEST_F(MultiCfIteratorTest, IterateAttributeGroups) {
// Set up the DB and Column Families
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);

constexpr char key_1[] = "key_1";
WideColumns key_1_columns_in_cf_2{
{kDefaultWideColumnName, "cf_2_col_val_0_key_1"},
{"cf_2_col_name_1", "cf_2_col_val_1_key_1"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_1"}};
WideColumns key_1_columns_in_cf_3{
{"cf_3_col_name_1", "cf_3_col_val_1_key_1"},
{"cf_3_col_name_2", "cf_3_col_val_2_key_1"},
{"cf_3_col_name_3", "cf_3_col_val_3_key_1"}};

constexpr char key_2[] = "key_2";
WideColumns key_2_columns_in_cf_1{
{"cf_1_col_name_1", "cf_1_col_val_1_key_2"}};
WideColumns key_2_columns_in_cf_2{
{"cf_2_col_name_1", "cf_2_col_val_1_key_2"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_2"}};

constexpr char key_3[] = "key_3";
WideColumns key_3_columns_in_cf_1{
{"cf_1_col_name_1", "cf_1_col_val_1_key_3"}};
WideColumns key_3_columns_in_cf_3{
{"cf_3_col_name_1", "cf_3_col_val_1_key_3"}};

constexpr char key_4[] = "key_4";
WideColumns key_4_columns_in_cf_0{
{"cf_0_col_name_1", "cf_0_col_val_1_key_4"}};
WideColumns key_4_columns_in_cf_2{
{"cf_2_col_name_1", "cf_2_col_val_1_key_4"}};

AttributeGroups key_1_attribute_groups{
AttributeGroup(handles_[2], key_1_columns_in_cf_2),
AttributeGroup(handles_[3], key_1_columns_in_cf_3)};
AttributeGroups key_2_attribute_groups{
AttributeGroup(handles_[1], key_2_columns_in_cf_1),
AttributeGroup(handles_[2], key_2_columns_in_cf_2)};
AttributeGroups key_3_attribute_groups{
AttributeGroup(handles_[1], key_3_columns_in_cf_1),
AttributeGroup(handles_[3], key_3_columns_in_cf_3)};
AttributeGroups key_4_attribute_groups{
AttributeGroup(handles_[0], key_4_columns_in_cf_0),
AttributeGroup(handles_[2], key_4_columns_in_cf_2)};

ASSERT_OK(db_->PutEntity(WriteOptions(), key_1, key_1_attribute_groups));
ASSERT_OK(db_->PutEntity(WriteOptions(), key_2, key_2_attribute_groups));
ASSERT_OK(db_->PutEntity(WriteOptions(), key_3, key_3_attribute_groups));
ASSERT_OK(db_->PutEntity(WriteOptions(), key_4, key_4_attribute_groups));

auto verify =
[&](const std::vector<ColumnFamilyHandle*>& cfhs,
const std::vector<Slice>& expected_keys,
const std::vector<Slice>& expected_values,
const std::vector<WideColumns>& expected_wide_columns,
const std::vector<AttributeGroups>& expected_attribute_groups) {
int i = 0;
MultiCfIterator* iter = db_->NewMultiCfIterator(ReadOptions(), cfhs);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(expected_keys[i], iter->key());
ASSERT_EQ(expected_values[i], iter->value());
ASSERT_EQ(expected_wide_columns[i], iter->columns());
ASSERT_EQ(expected_attribute_groups[i], iter->attribute_groups());
++i;
}
delete iter;
// TODO - Check when implementation is done
// ASSERT_EQ(expected_keys.size(), i);
};

// Test for iteration over CF default->1->2->3
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
handles_[0], handles_[1], handles_[2], handles_[3]};
std::vector<Slice> expected_keys = {key_1, key_2, key_3, key_4};
// Pick what DBIter would return for value() in the first CF that key exists
// Since value for kDefaultWideColumnName only exists for key_1, rest will
// return empty value
std::vector<Slice> expected_values = {"cf_2_col_val_0_key_1", "", "", ""};

// Merge columns from all CFs that key exists and value is stored as wide
// column
std::vector<WideColumns> expected_wide_columns = {
{{kDefaultWideColumnName, "cf_2_col_val_0_key_1"},
{"cf_2_col_name_1", "cf_2_col_val_1_key_1"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_1"},
{"cf_3_col_name_1", "cf_3_col_val_1_key_1"},
{"cf_3_col_name_2", "cf_3_col_val_2_key_1"},
{"cf_3_col_name_3", "cf_3_col_val_3_key_1"}},
{{"cf_1_col_name_1", "cf_1_col_val_1_key_2"},
{"cf_2_col_name_1", "cf_2_col_val_1_key_2"},
{"cf_2_col_name_2", "cf_2_col_val_2_key_2"}},
{{"cf_1_col_name_1", "cf_1_col_val_1_key_3"},
{"cf_3_col_name_1", "cf_3_col_val_1_key_3"}},
{{"cf_0_col_name_1", "cf_0_col_val_1_key_4"},
{"cf_2_col_name_1", "cf_2_col_val_1_key_4"}}};
std::vector<AttributeGroups> expected_attribute_groups = {
key_1_attribute_groups, key_2_attribute_groups, key_3_attribute_groups,
key_4_attribute_groups};
verify(cfhs_order_0_1_2_3, expected_keys, expected_values,
expected_wide_columns, expected_attribute_groups);
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
9 changes: 9 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "rocksdb/iterator.h"
#include "rocksdb/listener.h"
#include "rocksdb/metadata.h"
#include "rocksdb/multi_cf_iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/sst_file_writer.h"
Expand Down Expand Up @@ -969,6 +970,14 @@ class DB {
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) = 0;

// DO NOT USE, UNDER CONSTRUCTION
// Returns a cross-column-family iterator from a consistent database state.
// When the same key exists in more than one column families, this iterates in
// the order that column family is provided in column_families
virtual MultiCfIterator* NewMultiCfIterator(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) = 0;

// Return a handle to the current DB state. Iterators created with
// this handle will all observe a stable snapshot of the current DB
// state. The caller must call ReleaseSnapshot(result) when the
Expand Down
Loading

0 comments on commit ae682f2

Please sign in to comment.