diff --git a/CMakeLists.txt b/CMakeLists.txt index 23a4014bc08f..d5ee52c8712a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -696,6 +696,7 @@ set(SOURCES db/memtable_list.cc db/merge_helper.cc db/merge_operator.cc + db/multi_cf_iterator.cc db/output_validator.cc db/periodic_task_scheduler.cc db/range_del_aggregator.cc @@ -1348,6 +1349,7 @@ if(WITH_TESTS) db/memtable_list_test.cc db/merge_helper_test.cc db/merge_test.cc + db/multi_cf_iterator_test.cc db/options_file_test.cc db/perf_context_test.cc db/periodic_task_scheduler_test.cc diff --git a/Makefile b/Makefile index 50dddc976061..cf2c580edd09 100644 --- a/Makefile +++ b/Makefile @@ -1639,6 +1639,9 @@ wal_edit_test: $(OBJ_DIR)/db/wal_edit_test.o $(TEST_LIBRARY) $(LIBRARY) dbformat_test: $(OBJ_DIR)/db/dbformat_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +multi_cf_iterator_test: $(OBJ_DIR)/db/multi_cf_iterator_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + env_basic_test: $(OBJ_DIR)/env/env_basic_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index e8aaf325d464..73ddeabc310c 100644 --- a/TARGETS +++ b/TARGETS @@ -83,6 +83,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "db/memtable_list.cc", "db/merge_helper.cc", "db/merge_operator.cc", + "db/multi_cf_iterator.cc", "db/output_validator.cc", "db/periodic_task_scheduler.cc", "db/range_del_aggregator.cc", @@ -5226,6 +5227,12 @@ cpp_unittest_wrapper(name="mock_env_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="multi_cf_iterator_test", + srcs=["db/multi_cf_iterator_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="object_registry_test", srcs=["utilities/object_registry_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 297c6aceb764..1d40a6f334ca 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -77,6 +77,7 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" +#include "rocksdb/multi_cf_iterator.h" #include "rocksdb/statistics.h" #include "rocksdb/stats_history.h" #include "rocksdb/status.h" @@ -3934,6 +3935,26 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl( return db_iter; } +MultiCfIterator* DBImpl::NewMultiCfIterator( + const ReadOptions& _read_options, + const std::vector& column_families) { + assert(column_families.size() > 0); + + // Use the key comparator from the first CF + auto first_cfh = + static_cast_with_check(column_families[0]); + ColumnFamilyData* first_cfd = first_cfh->cfd(); + + std::vector child_iterators; + Status s = NewIterators(_read_options, column_families, &child_iterators); + if (s.ok()) { + return NewMultiColumnFamilyIterator(first_cfd->user_comparator(), + column_families, child_iterators); + } + // TODO - return something like NewErrorIterator? + return nullptr; +} + Status DBImpl::NewIterators( const ReadOptions& _read_options, const std::vector& column_families, diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 34a5f33989c4..73b76f14148b 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -351,6 +351,14 @@ class DBImpl : public DB { const std::vector& column_families, std::vector* iterators) override; + // DO NOT USE, UNDER CONSTRUCTION + // Returns a cross-column-family iterator from a consistent database state. + // When the same key exists in more than one column families, this iterates in + // the order that column family is provided in column_families + virtual MultiCfIterator* NewMultiCfIterator( + const ReadOptions& options, + const std::vector& column_families) override; + virtual const Snapshot* GetSnapshot() override; virtual void ReleaseSnapshot(const Snapshot* snapshot) override; // Create a timestamped snapshot. This snapshot can be shared by multiple diff --git a/db/db_test.cc b/db/db_test.cc index 99a03b1509a2..35daea2050d1 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3197,6 +3197,14 @@ class ModelDB : public DB { std::vector* /*iterators*/) override { return Status::NotSupported("Not supported yet"); } + + // DO NOT USE, UNDER CONSTRUCTION + MultiCfIterator* NewMultiCfIterator( + const ReadOptions& /*options*/, + const std::vector& /*column_families*/) override { + return nullptr; + } + const Snapshot* GetSnapshot() override { ModelSnapshot* snapshot = new ModelSnapshot; snapshot->map_ = map_; diff --git a/db/multi_cf_iterator.cc b/db/multi_cf_iterator.cc new file mode 100644 index 000000000000..96807cd3d626 --- /dev/null +++ b/db/multi_cf_iterator.cc @@ -0,0 +1,41 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "rocksdb/multi_cf_iterator.h" + +namespace ROCKSDB_NAMESPACE { + +MultiCfIterator::MultiCfIterator( + const Comparator* comparator, + const std::vector& column_families, + const std::vector& child_iterators) + : comparator_(comparator) { + assert(column_families.size() > 0); + assert(column_families.size() == child_iterators.size()); + + cfhs_.reserve(column_families.size()); + iterators_.reserve(column_families.size()); + for (size_t i = 0; i < column_families.size(); ++i) { + cfhs_.push_back(column_families[i]); + iterators_.push_back(child_iterators[i]); + } +} +MultiCfIterator::~MultiCfIterator() { + for (auto iter : iterators_) { + delete iter; + } + status_.PermitUncheckedError(); +} + +MultiCfIterator* NewMultiColumnFamilyIterator( + const Comparator* comparator, + const std::vector& column_families, + const std::vector& child_iterators) { + MultiCfIterator* iterator = + new MultiCfIterator(comparator, column_families, child_iterators); + return iterator; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/multi_cf_iterator_test.cc b/db/multi_cf_iterator_test.cc new file mode 100644 index 000000000000..4a3c42fb56c3 --- /dev/null +++ b/db/multi_cf_iterator_test.cc @@ -0,0 +1,202 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_test_util.h" + +namespace ROCKSDB_NAMESPACE { + +class MultiCfIteratorTest : public DBTestBase { + public: + MultiCfIteratorTest() + : DBTestBase("multi_cf_iterator_test", /*env_do_fsync=*/true) {} +}; + +TEST_F(MultiCfIteratorTest, SimpleValues) { + Options options = GetDefaultOptions(); + auto verify = [&](const std::vector& cfhs, + const std::vector& expected_keys, + const std::vector& expected_values) { + int i = 0; + MultiCfIterator* iter = db_->NewMultiCfIterator(ReadOptions(), cfhs); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_EQ(expected_keys[i], iter->key()); + ASSERT_EQ(expected_values[i], iter->value()); + ++i; + } + delete iter; + // TODO - Check when implementation is done + // ASSERT_EQ(expected_keys.size(), i); + }; + + { + // Case 1: Unique key per CF + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val")); + ASSERT_OK(Put(1, "key_2", "key_2_cf_1_val")); + ASSERT_OK(Put(2, "key_3", "key_3_cf_2_val")); + ASSERT_OK(Put(3, "key_4", "key_4_cf_3_val")); + + std::vector expected_keys = {"key_1", "key_2", "key_3", "key_4"}; + std::vector expected_values = {"key_1_cf_0_val", "key_2_cf_1_val", + "key_3_cf_2_val", "key_4_cf_3_val"}; + + // Test for iteration over CF default->1->2->3 + std::vector cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + verify(cfhs_order_0_1_2_3, expected_keys, expected_values); + + // Test for iteration over CF 3->1->default_cf->2 + std::vector cfhs_order_3_1_0_2 = { + handles_[3], handles_[1], handles_[0], handles_[2]}; + // Iteration order and the return values should be the same since keys are + // unique per CF + verify(cfhs_order_3_1_0_2, expected_keys, expected_values); + } + { + // Case 2: Same key in multiple CFs + options = CurrentOptions(options); + DestroyAndReopen(options); + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + ASSERT_OK(Put(0, "key_1", "key_1_cf_0_val")); + ASSERT_OK(Put(3, "key_1", "key_1_cf_3_val")); + ASSERT_OK(Put(1, "key_2", "key_2_cf_1_val")); + ASSERT_OK(Put(2, "key_2", "key_2_cf_2_val")); + ASSERT_OK(Put(0, "key_3", "key_3_cf_0_val")); + ASSERT_OK(Put(1, "key_3", "key_3_cf_1_val")); + ASSERT_OK(Put(3, "key_3", "key_3_cf_3_val")); + + std::vector expected_keys = {"key_1", "key_2", "key_3"}; + + // Test for iteration over CF default->1->2->3 + std::vector cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + // Pick what DBIter would return for value() in the first CF that key exists + // Since value for kDefaultWideColumnName only exists for key_1, rest will + // return empty value + std::vector expected_values = {"key_1_cf_0_val", "key_2_cf_1_val", + "key_3_cf_0_val"}; + verify(cfhs_order_0_1_2_3, expected_keys, expected_values); + + // Test for iteration over CF 3->2->default_cf->1 + std::vector cfhs_order_3_2_0_1 = { + handles_[3], handles_[2], handles_[0], handles_[1]}; + expected_values = {"key_1_cf_3_val", "key_2_cf_2_val", "key_3_cf_3_val"}; + verify(cfhs_order_3_2_0_1, expected_keys, expected_values); + } +} + +TEST_F(MultiCfIteratorTest, IterateAttributeGroups) { + // Set up the DB and Column Families + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options); + + constexpr char key_1[] = "key_1"; + WideColumns key_1_columns_in_cf_2{ + {kDefaultWideColumnName, "cf_2_col_val_0_key_1"}, + {"cf_2_col_name_1", "cf_2_col_val_1_key_1"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_1"}}; + WideColumns key_1_columns_in_cf_3{ + {"cf_3_col_name_1", "cf_3_col_val_1_key_1"}, + {"cf_3_col_name_2", "cf_3_col_val_2_key_1"}, + {"cf_3_col_name_3", "cf_3_col_val_3_key_1"}}; + + constexpr char key_2[] = "key_2"; + WideColumns key_2_columns_in_cf_1{ + {"cf_1_col_name_1", "cf_1_col_val_1_key_2"}}; + WideColumns key_2_columns_in_cf_2{ + {"cf_2_col_name_1", "cf_2_col_val_1_key_2"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_2"}}; + + constexpr char key_3[] = "key_3"; + WideColumns key_3_columns_in_cf_1{ + {"cf_1_col_name_1", "cf_1_col_val_1_key_3"}}; + WideColumns key_3_columns_in_cf_3{ + {"cf_3_col_name_1", "cf_3_col_val_1_key_3"}}; + + constexpr char key_4[] = "key_4"; + WideColumns key_4_columns_in_cf_0{ + {"cf_0_col_name_1", "cf_0_col_val_1_key_4"}}; + WideColumns key_4_columns_in_cf_2{ + {"cf_2_col_name_1", "cf_2_col_val_1_key_4"}}; + + AttributeGroups key_1_attribute_groups{ + AttributeGroup(handles_[2], key_1_columns_in_cf_2), + AttributeGroup(handles_[3], key_1_columns_in_cf_3)}; + AttributeGroups key_2_attribute_groups{ + AttributeGroup(handles_[1], key_2_columns_in_cf_1), + AttributeGroup(handles_[2], key_2_columns_in_cf_2)}; + AttributeGroups key_3_attribute_groups{ + AttributeGroup(handles_[1], key_3_columns_in_cf_1), + AttributeGroup(handles_[3], key_3_columns_in_cf_3)}; + AttributeGroups key_4_attribute_groups{ + AttributeGroup(handles_[0], key_4_columns_in_cf_0), + AttributeGroup(handles_[2], key_4_columns_in_cf_2)}; + + ASSERT_OK(db_->PutEntity(WriteOptions(), key_1, key_1_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_2, key_2_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_3, key_3_attribute_groups)); + ASSERT_OK(db_->PutEntity(WriteOptions(), key_4, key_4_attribute_groups)); + + auto verify = + [&](const std::vector& cfhs, + const std::vector& expected_keys, + const std::vector& expected_values, + const std::vector& expected_wide_columns, + const std::vector& expected_attribute_groups) { + int i = 0; + MultiCfIterator* iter = db_->NewMultiCfIterator(ReadOptions(), cfhs); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_EQ(expected_keys[i], iter->key()); + ASSERT_EQ(expected_values[i], iter->value()); + ASSERT_EQ(expected_wide_columns[i], iter->columns()); + ASSERT_EQ(expected_attribute_groups[i], iter->attribute_groups()); + ++i; + } + delete iter; + // TODO - Check when implementation is done + // ASSERT_EQ(expected_keys.size(), i); + }; + + // Test for iteration over CF default->1->2->3 + std::vector cfhs_order_0_1_2_3 = { + handles_[0], handles_[1], handles_[2], handles_[3]}; + std::vector expected_keys = {key_1, key_2, key_3, key_4}; + // Pick what DBIter would return for value() in the first CF that key exists + // Since value for kDefaultWideColumnName only exists for key_1, rest will + // return empty value + std::vector expected_values = {"cf_2_col_val_0_key_1", "", "", ""}; + + // Merge columns from all CFs that key exists and value is stored as wide + // column + std::vector expected_wide_columns = { + {{kDefaultWideColumnName, "cf_2_col_val_0_key_1"}, + {"cf_2_col_name_1", "cf_2_col_val_1_key_1"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_1"}, + {"cf_3_col_name_1", "cf_3_col_val_1_key_1"}, + {"cf_3_col_name_2", "cf_3_col_val_2_key_1"}, + {"cf_3_col_name_3", "cf_3_col_val_3_key_1"}}, + {{"cf_1_col_name_1", "cf_1_col_val_1_key_2"}, + {"cf_2_col_name_1", "cf_2_col_val_1_key_2"}, + {"cf_2_col_name_2", "cf_2_col_val_2_key_2"}}, + {{"cf_1_col_name_1", "cf_1_col_val_1_key_3"}, + {"cf_3_col_name_1", "cf_3_col_val_1_key_3"}}, + {{"cf_0_col_name_1", "cf_0_col_val_1_key_4"}, + {"cf_2_col_name_1", "cf_2_col_val_1_key_4"}}}; + std::vector expected_attribute_groups = { + key_1_attribute_groups, key_2_attribute_groups, key_3_attribute_groups, + key_4_attribute_groups}; + verify(cfhs_order_0_1_2_3, expected_keys, expected_values, + expected_wide_columns, expected_attribute_groups); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 5ae73182b60b..0173c966b7d8 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -21,6 +21,7 @@ #include "rocksdb/iterator.h" #include "rocksdb/listener.h" #include "rocksdb/metadata.h" +#include "rocksdb/multi_cf_iterator.h" #include "rocksdb/options.h" #include "rocksdb/snapshot.h" #include "rocksdb/sst_file_writer.h" @@ -969,6 +970,14 @@ class DB { const std::vector& column_families, std::vector* iterators) = 0; + // DO NOT USE, UNDER CONSTRUCTION + // Returns a cross-column-family iterator from a consistent database state. + // When the same key exists in more than one column families, this iterates in + // the order that column family is provided in column_families + virtual MultiCfIterator* NewMultiCfIterator( + const ReadOptions& options, + const std::vector& column_families) = 0; + // Return a handle to the current DB state. Iterators created with // this handle will all observe a stable snapshot of the current DB // state. The caller must call ReleaseSnapshot(result) when the diff --git a/include/rocksdb/multi_cf_iterator.h b/include/rocksdb/multi_cf_iterator.h new file mode 100644 index 000000000000..4cda6404c110 --- /dev/null +++ b/include/rocksdb/multi_cf_iterator.h @@ -0,0 +1,97 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/status.h" +#include "rocksdb/wide_columns.h" +#include "util/heap.h" +#include "wide_columns.h" + +namespace ROCKSDB_NAMESPACE { + +class Iterator; +class ColumnFamilyHandle; +struct ReadOptions; + +class MultiCfIterator : public Iterator { + public: + MultiCfIterator(const Comparator* comparator, + const std::vector& column_families, + const std::vector& child_iterators); + ~MultiCfIterator(); + + // No copy allowed + MultiCfIterator(const MultiCfIterator&) = delete; + MultiCfIterator& operator=(const MultiCfIterator&) = delete; + + Slice key() const override { + assert(Valid()); + return min_heap_.top()->key(); + } + bool Valid() const override { return !min_heap_.empty() && status_.ok(); } + Status status() const override { return status_; } + + // TODO - Implement these + void Seek(const Slice& /*target*/) override {} + void SeekForPrev(const Slice& /*target*/) override {} + void SeekToFirst() override {} + void SeekToLast() override {} + void Next() override { assert(false); } + void Prev() override { assert(false); } + Slice value() const override { + assert(false); + // TODO - Pick from the first CF that has the value for the key + return value_; + } + const WideColumns& columns() const override { + assert(false); + // TODO - Lazily merge columns from child iterators + return wide_columns_; + } + const AttributeGroups& attribute_groups() { + assert(false); + // TODO - Lazily populate attribute groups from child iterators + return attribute_groups_; + } + + private: + std::vector cfhs_; + std::vector iterators_; + ReadOptions read_options_; + Status status_; + + Slice value_; + WideColumns wide_columns_; + AttributeGroups attribute_groups_; + + class MultiCfMinHeapItemComparator { + public: + MultiCfMinHeapItemComparator() {} + explicit MultiCfMinHeapItemComparator(const Comparator* comparator) + : comparator_(comparator) {} + + bool operator()(const Iterator& a, const Iterator& b) const { + return comparator_->Compare(a.key(), b.key()) > 0; + } + + private: + const Comparator* comparator_; + }; + + const Comparator* comparator_; + using MultiCfMinHeap = BinaryHeap; + MultiCfMinHeap min_heap_; + // TODO: MaxHeap for Reverse Iteration +}; +extern MultiCfIterator* NewMultiColumnFamilyIterator( + const Comparator* comparator, + const std::vector& column_families, + const std::vector& child_iterators); + +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 86e1477a4f57..9b6725352863 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -275,6 +275,13 @@ class StackableDB : public DB { return db_->NewIterators(options, column_families, iterators); } + using DB::NewMultiCfIterator; + virtual MultiCfIterator* NewMultiCfIterator( + const ReadOptions& options, + const std::vector& column_families) override { + return db_->NewMultiCfIterator(options, column_families); + } + virtual const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); } virtual void ReleaseSnapshot(const Snapshot* snapshot) override { diff --git a/include/rocksdb/wide_columns.h b/include/rocksdb/wide_columns.h index 35b81268bed8..e0b889ca4aa1 100644 --- a/include/rocksdb/wide_columns.h +++ b/include/rocksdb/wide_columns.h @@ -238,6 +238,11 @@ class AttributeGroup { WideColumns columns_; }; +inline bool operator==(const AttributeGroup& lhs, const AttributeGroup& rhs) { + return lhs.column_family() == rhs.column_family() && + lhs.columns() == rhs.columns(); +} + // A collection of Attribute Groups. using AttributeGroups = std::vector; diff --git a/src.mk b/src.mk index a03a476ff151..7ec221d7603d 100644 --- a/src.mk +++ b/src.mk @@ -76,6 +76,7 @@ LIB_SOURCES = \ db/memtable_list.cc \ db/merge_helper.cc \ db/merge_operator.cc \ + db/multi_cf_iterator.cc \ db/output_validator.cc \ db/periodic_task_scheduler.cc \ db/range_del_aggregator.cc \ @@ -516,6 +517,7 @@ TEST_MAIN_SOURCES = \ db/memtable_list_test.cc \ db/merge_helper_test.cc \ db/merge_test.cc \ + db/multi_cf_iterator_test.cc \ db/obsolete_files_test.cc \ db/options_file_test.cc \ db/perf_context_test.cc \