Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce CloseOptions::prepare_close_fn to allow pre-Close callback function #13237

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1336,8 +1336,15 @@ TEST_F(DBBasicTest, DBClose) {
ASSERT_OK(s);
ASSERT_TRUE(db != nullptr);

s = db->Close();
bool prepare_close_fn_called = false;
CloseOptions close_options;
close_options.prepare_close_fn = [&prepare_close_fn_called]() {
prepare_close_fn_called = true;
};

s = db->Close(close_options);
ASSERT_EQ(s, Status::OK());
ASSERT_TRUE(prepare_close_fn_called);
delete db;
ASSERT_EQ(env->GetCloseCount(), 3);
options.info_log.reset();
Expand Down
11 changes: 11 additions & 0 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3622,7 +3622,18 @@ TEST_P(DBCompactionWaitForCompactTest,
ASSERT_EQ(0, flush_finished);
ASSERT_EQ("2", FilesPerLevel());

bool prepare_close_fn_called = false;
if (close_db_) {
CloseOptions close_options;
close_options.prepare_close_fn = [&prepare_close_fn_called]() {
prepare_close_fn_called = true;
};
wait_for_compact_options_.close_options = std::move(close_options);
}
ASSERT_OK(dbfull()->WaitForCompact(wait_for_compact_options_));
if (close_db_) {
ASSERT_TRUE(prepare_close_fn_called);
}

int expected_flush_count = flush_ || close_db_;
ASSERT_EQ(expected_flush_count, flush_finished);
Expand Down
6 changes: 4 additions & 2 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5318,7 +5318,7 @@ Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {

DB::~DB() = default;

Status DBImpl::Close() {
Status DBImpl::Close(const CloseOptions& close_options) {
InstrumentedMutexLock closing_lock_guard(&closing_mutex_);
if (closed_) {
return closing_status_;
Expand All @@ -5330,7 +5330,9 @@ Status DBImpl::Close() {
return s;
}
}

if (close_options.prepare_close_fn) {
close_options.prepare_close_fn();
}
closing_status_ = CloseImpl();
closed_ = true;
return closing_status_;
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ class DBImpl : public DB {

ColumnFamilyHandle* PersistentStatsColumnFamily() const;

Status Close() override;
using DB::Close;
Status Close(const CloseOptions& close_options) override;

Status DisableFileDeletions() override;

Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4429,7 +4429,7 @@ Status DBImpl::WaitForCompact(
} else if (wait_for_compact_options.close_db) {
reject_new_background_jobs_ = true;
mutex_.Unlock();
Status s = Close();
Status s = Close(wait_for_compact_options.close_options);
mutex_.Lock();
if (!s.ok()) {
reject_new_background_jobs_ = false;
Expand Down
4 changes: 2 additions & 2 deletions db/db_impl/db_impl_follower.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ void DBImplFollower::PeriodicRefresh() {
}
}

Status DBImplFollower::Close() {
Status DBImplFollower::Close(const CloseOptions& close_options) {
if (catch_up_thread_) {
stop_requested_.store(true);
{
Expand All @@ -237,7 +237,7 @@ Status DBImplFollower::Close() {

ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem_);

return DBImpl::Close();
return DBImpl::Close(close_options);
}

Status DB::OpenAsFollower(const Options& options, const std::string& dbname,
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl_follower.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class DBImplFollower : public DBImplSecondary {
const std::string& dbname, std::string src_path);
~DBImplFollower();

Status Close() override;
using DBImpl::Close;
Status Close(const CloseOptions& close_options) override;

protected:
bool OwnTablesAndLogs() const override {
Expand Down
2 changes: 1 addition & 1 deletion db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3061,7 +3061,7 @@ class ModelDB : public DB {
}

using DB::Close;
Status Close() override { return Status::OK(); }
Status Close(const CloseOptions&) override { return Status::OK(); }
using DB::Delete;
Status Delete(const WriteOptions& o, ColumnFamilyHandle* cf,
const Slice& key) override {
Expand Down
5 changes: 4 additions & 1 deletion include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,10 @@ class DB {
// WaitForCompact() with WaitForCompactOptions.close_db=true will be a good
// choice for users who want to wait for background work before closing
// (rather than aborting and potentially redoing some work on re-open)
virtual Status Close() { return Status::NotSupported(); }
Status Close() { return Close(CloseOptions()); }
virtual Status Close(const CloseOptions& /*close_options*/) {
return Status::NotSupported();
}

// ListColumnFamilies will open the DB specified by argument name
// and return the list of all column families in that DB
Expand Down
6 changes: 6 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -2391,6 +2391,10 @@ struct LiveFilesStorageInfoOptions {
uint64_t wal_size_for_flush = 0;
};

struct CloseOptions {
std::function<void()> prepare_close_fn;
};

struct WaitForCompactOptions {
// A boolean to abort waiting in case of a pause (PauseBackgroundWork()
// called) If true, Status::Aborted will be returned immediately. If false,
Expand All @@ -2412,6 +2416,8 @@ struct WaitForCompactOptions {
// returned Aborted status due to unreleased snapshots in the system. See
// comments in DB::Close() for details.
bool close_db = false;
// Options to be used for closing db. If close_db is false, this is ignored
CloseOptions close_options;

// Timeout in microseconds for waiting for compaction to complete.
// Status::TimedOut will be returned if timeout expires.
Expand Down
5 changes: 4 additions & 1 deletion include/rocksdb/utilities/stackable_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ class StackableDB : public DB {
db_ = nullptr;
}

Status Close() override { return db_->Close(); }
using DB::Close;
Status Close(const CloseOptions& close_options) override {
return db_->Close(close_options);
}

virtual DB* GetBaseDB() { return db_; }

Expand Down
1 change: 1 addition & 0 deletions unreleased_history/public_api_changes/close_options.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Introduce `CloseOptions` for `Close()` API. `close_options.prepare_close_fn` allows users to inject a necessary external functions to be called before closing the DB. This option is also integrated with `WaitForCompact()` API when `WaitForCompactOptions::close_db=true`. `WaitForCompactOptions::close_options` can be specified, but is ignored if close_db is false.
2 changes: 1 addition & 1 deletion utilities/blob_db/blob_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class BlobDB : public StackableDB {
}

using ROCKSDB_NAMESPACE::StackableDB::Close;
Status Close() override = 0;
Status Close(const CloseOptions& close_options) override = 0;

// Opening blob db.
static Status Open(const Options& options, const BlobDBOptions& bdb_options,
Expand Down
8 changes: 4 additions & 4 deletions utilities/blob_db/blob_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,24 @@ BlobDBImpl::~BlobDBImpl() {
assert(s.ok());
}

Status BlobDBImpl::Close() {
Status BlobDBImpl::Close(const CloseOptions& close_options) {
ThreadStatus::OperationType cur_op_type =
ThreadStatusUtil::GetThreadOperation();
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_UNKNOWN);
Status s = CloseImpl();
Status s = CloseImpl(close_options);
ThreadStatusUtil::SetThreadOperation(cur_op_type);
return s;
}

Status BlobDBImpl::CloseImpl() {
Status BlobDBImpl::CloseImpl(const CloseOptions& close_options) {
if (closed_) {
return Status::OK();
}
closed_ = true;

// Close base DB before BlobDBImpl destructs to stop event listener and
// compaction filter call.
Status s = db_->Close();
Status s = db_->Close(close_options);
// delete db_ anyway even if close failed.
delete db_;
// Reset pointers to avoid StackableDB delete the pointer again.
Expand Down
5 changes: 3 additions & 2 deletions utilities/blob_db/blob_db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ class BlobDBImpl : public BlobDB {
using BlobDB::Write;
Status Write(const WriteOptions& opts, WriteBatch* updates) override;

Status Close() override;
using BlobDB::Close;
Status Close(const CloseOptions& close_options) override;

using BlobDB::PutWithTTL;
Status PutWithTTL(const WriteOptions& options, const Slice& key,
Expand Down Expand Up @@ -406,7 +407,7 @@ class BlobDBImpl : public BlobDB {
uint64_t blob_size,
bool force_evict = false);

Status CloseImpl();
Status CloseImpl(const CloseOptions& close_options);

// name of the database directory
std::string dbname_;
Expand Down
4 changes: 2 additions & 2 deletions utilities/ttl/db_ttl_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,13 @@ DBWithTTLImpl::~DBWithTTLImpl() {
}
}

Status DBWithTTLImpl::Close() {
Status DBWithTTLImpl::Close(const CloseOptions& close_options) {
Status ret = Status::OK();
if (!closed_) {
Options default_options = GetOptions();
// Need to stop background compaction before getting rid of the filter
CancelAllBackgroundWork(db_, /* wait = */ true);
ret = db_->Close();
ret = db_->Close(close_options);
delete default_options.compaction_filter;
closed_ = true;
}
Expand Down
3 changes: 2 additions & 1 deletion utilities/ttl/db_ttl_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class DBWithTTLImpl : public DBWithTTL {

virtual ~DBWithTTLImpl();

Status Close() override;
using StackableDB::Close;
Status Close(const CloseOptions& close_options) override;

Status CreateColumnFamilyWithTtl(const ColumnFamilyOptions& options,
const std::string& column_family_name,
Expand Down
Loading