Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Put with secondary indices #13289

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 125 additions & 49 deletions utilities/secondary_index/secondary_index_mixin.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,23 @@ class SecondaryIndexMixin : public Txn {
}

using Txn::Put;
Status Put(ColumnFamilyHandle* /* column_family */, const Slice& /* key */,
const Slice& /* value */,
const bool /* assume_tracked */ = false) override {
return Status::NotSupported("Put with secondary indices not yet supported");
Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value, const bool assume_tracked = false) override {
return PerformWithSavePoint([&]() {
const bool do_validate = !assume_tracked;
return PutWithSecondaryIndices(column_family, key, value, do_validate);
});
}
Status Put(ColumnFamilyHandle* /* column_family */,
const SliceParts& /* key */, const SliceParts& /* value */,
const bool /* assume_tracked */ = false) override {
return Status::NotSupported("Put with secondary indices not yet supported");
Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value,
const bool assume_tracked = false) override {
std::string key_str;
const Slice key_slice(key, &key_str);

std::string value_str;
const Slice value_slice(value, &value_str);

return Put(column_family, key_slice, value_slice, assume_tracked);
}

Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key,
Expand Down Expand Up @@ -92,17 +100,22 @@ class SecondaryIndexMixin : public Txn {
}

using Txn::PutUntracked;
Status PutUntracked(ColumnFamilyHandle* /* column_family */,
const Slice& /* key */,
const Slice& /* value */) override {
return Status::NotSupported(
"PutUntracked with secondary indices not yet supported");
Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override {
return PerformWithSavePoint([&]() {
constexpr bool do_validate = false;
return PutWithSecondaryIndices(column_family, key, value, do_validate);
});
}
Status PutUntracked(ColumnFamilyHandle* /* column_family */,
const SliceParts& /* key */,
const SliceParts& /* value */) override {
return Status::NotSupported(
"PutUntracked with secondary indices not yet supported");
Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override {
std::string key_str;
const Slice key_slice(key, &key_str);

std::string value_str;
const Slice value_slice(value, &value_str);

return PutUntracked(column_family, key_slice, value_slice);
}

Status PutEntityUntracked(ColumnFamilyHandle* column_family, const Slice& key,
Expand Down Expand Up @@ -179,21 +192,6 @@ class SecondaryIndexMixin : public Txn {
var);
}

template <typename Iterator>
static Iterator FindPrimaryColumn(const SecondaryIndex* secondary_index,
ColumnFamilyHandle* column_family,
Iterator begin, Iterator end) {
assert(secondary_index);
assert(column_family);

if (column_family != secondary_index->GetPrimaryColumnFamily()) {
return end;
}

return WideColumnsHelper::Find(begin, end,
secondary_index->GetPrimaryColumnName());
}

template <typename Operation>
Status PerformWithSavePoint(Operation&& operation) {
Txn::SetSavePoint();
Expand Down Expand Up @@ -247,6 +245,15 @@ class SecondaryIndexMixin : public Txn {
secondary_key);
}

Status AddPrimaryEntry(ColumnFamilyHandle* column_family,
const Slice& primary_key, const Slice& primary_value) {
assert(column_family);

constexpr bool assume_tracked = true;

return Txn::Put(column_family, primary_key, primary_value, assume_tracked);
}

Status AddPrimaryEntry(ColumnFamilyHandle* column_family,
const Slice& primary_key,
const WideColumns& primary_columns) {
Expand Down Expand Up @@ -317,9 +324,15 @@ class SecondaryIndexMixin : public Txn {
const auto& existing_columns = existing_primary_columns.columns();

for (const auto& secondary_index : *secondary_indices_) {
const auto it =
FindPrimaryColumn(secondary_index.get(), column_family,
existing_columns.cbegin(), existing_columns.cend());
assert(secondary_index);

if (secondary_index->GetPrimaryColumnFamily() != column_family) {
continue;
}

const auto it = WideColumnsHelper::Find(
existing_columns.cbegin(), existing_columns.cend(),
secondary_index->GetPrimaryColumnName());
if (it == existing_columns.cend()) {
continue;
}
Expand All @@ -334,18 +347,67 @@ class SecondaryIndexMixin : public Txn {
return Status::OK();
}

Status UpdatePrimaryColumnValues(ColumnFamilyHandle* column_family,
const Slice& primary_key,
Slice& primary_value,
autovector<IndexData>& applicable_indices) {
assert(column_family);
assert(applicable_indices.empty());

applicable_indices.reserve(secondary_indices_->size());

for (const auto& secondary_index : *secondary_indices_) {
assert(secondary_index);

if (secondary_index->GetPrimaryColumnFamily() != column_family) {
continue;
}

if (secondary_index->GetPrimaryColumnName() != kDefaultWideColumnName) {
continue;
}

applicable_indices.emplace_back(
IndexData(secondary_index.get(), primary_value));

auto& index_data = applicable_indices.back();

const Status s = secondary_index->UpdatePrimaryColumnValue(
primary_key, index_data.previous_column_value(),
&index_data.updated_column_value());
if (!s.ok()) {
return s;
}

primary_value = index_data.primary_column_value();
}

return Status::OK();
}

Status UpdatePrimaryColumnValues(ColumnFamilyHandle* column_family,
const Slice& primary_key,
WideColumns& primary_columns,
autovector<IndexData>& applicable_indices) {
assert(column_family);
assert(applicable_indices.empty());

// TODO: as an optimization, we can avoid calling SortColumns a second time
// in WriteBatchInternal::PutEntity
WideColumnsHelper::SortColumns(primary_columns);

applicable_indices.reserve(secondary_indices_->size());

for (const auto& secondary_index : *secondary_indices_) {
const auto it =
FindPrimaryColumn(secondary_index.get(), column_family,
primary_columns.begin(), primary_columns.end());
assert(secondary_index);

if (secondary_index->GetPrimaryColumnFamily() != column_family) {
continue;
}

const auto it = WideColumnsHelper::Find(
primary_columns.begin(), primary_columns.end(),
secondary_index->GetPrimaryColumnName());
if (it == primary_columns.end()) {
continue;
}
Expand Down Expand Up @@ -382,10 +444,11 @@ class SecondaryIndexMixin : public Txn {
return Status::OK();
}

Status PutEntityWithSecondaryIndices(ColumnFamilyHandle* column_family,
const Slice& key,
const WideColumns& columns,
bool do_validate) {
template <typename Value>
Status PutWithSecondaryIndicesImpl(ColumnFamilyHandle* column_family,
const Slice& key,
const Value& value_or_columns,
bool do_validate) {
// TODO: we could avoid removing and recreating secondary entries for
// which neither the secondary key prefix nor the value has changed

Expand All @@ -403,22 +466,21 @@ class SecondaryIndexMixin : public Txn {
}
}

auto primary_value_or_columns = value_or_columns;
autovector<IndexData> applicable_indices;

WideColumns primary_columns(columns);
WideColumnsHelper::SortColumns(primary_columns);

{
const Status s = UpdatePrimaryColumnValues(
column_family, primary_key, primary_columns, applicable_indices);
const Status s = UpdatePrimaryColumnValues(column_family, primary_key,
primary_value_or_columns,
applicable_indices);
if (!s.ok()) {
return s;
}
}

{
const Status s =
AddPrimaryEntry(column_family, primary_key, primary_columns);
AddPrimaryEntry(column_family, primary_key, primary_value_or_columns);
if (!s.ok()) {
return s;
}
Expand All @@ -434,6 +496,20 @@ class SecondaryIndexMixin : public Txn {
return Status::OK();
}

Status PutWithSecondaryIndices(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value,
bool do_validate) {
return PutWithSecondaryIndicesImpl(column_family, key, value, do_validate);
}

Status PutEntityWithSecondaryIndices(ColumnFamilyHandle* column_family,
const Slice& key,
const WideColumns& columns,
bool do_validate) {
return PutWithSecondaryIndicesImpl(column_family, key, columns,
do_validate);
}

const std::vector<std::shared_ptr<SecondaryIndex>>* secondary_indices_;
};

Expand Down
Loading
Loading