Skip to content

Commit

Permalink
chore: remove DflyVersion::VER0 (#3593)
Browse files Browse the repository at this point in the history
Stop supporting DflyVersion::VER0 from more than a year ago.
In addition, rename Metrics fields to make them more clear
General improvements and fix the reconnect metric.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Aug 28, 2024
1 parent 24b8d32 commit 0ee52c9
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 94 deletions.
2 changes: 1 addition & 1 deletion src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ struct ConnectionState {
uint32_t repl_flow_id = UINT32_MAX;
std::string repl_ip_address;
uint32_t repl_listening_port = 0;
DflyVersion repl_version = DflyVersion::VER0;
DflyVersion repl_version = DflyVersion::VER1;
};

struct SquashingInfo {
Expand Down
5 changes: 3 additions & 2 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct FlowInfo {
std::unique_ptr<JournalStreamer> streamer; // Streamer for stable sync phase
std::string eof_token;

DflyVersion version = DflyVersion::VER0;
DflyVersion version = DflyVersion::VER1;

std::optional<LSN> start_partial_sync_at;
uint64_t last_acked_lsn = 0;
Expand Down Expand Up @@ -128,7 +128,7 @@ class DflyCmd {
std::string id;
std::string address;
uint32_t listening_port;
DflyVersion version = DflyVersion::VER0;
DflyVersion version = DflyVersion::VER1;

// Flows describe the state of shard-local flow.
// They are always indexed by the shard index on the master.
Expand All @@ -153,6 +153,7 @@ class DflyCmd {
// Master side acces method to replication info of that connection.
std::shared_ptr<ReplicaInfo> GetReplicaInfoFromConnection(ConnectionContext* cntx);

// Master-side command. Provides Replica info.
std::vector<ReplicaRoleInfo> GetReplicasRoleInfo() const ABSL_LOCKS_EXCLUDED(mu_);

void GetReplicationMemoryStats(ReplicationMemoryStats* out) const ABSL_LOCKS_EXCLUDED(mu_);
Expand Down
101 changes: 51 additions & 50 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ void Replica::Pause(bool pause) {
VLOG(1) << "Pausing replication";
Proactor()->Await([&] {
is_paused_ = pause;
if (num_df_flows_ > 0) {
auto partition = Partition(num_df_flows_);
auto cb = [&](unsigned index, auto*) {
for (auto id : partition[index]) {
shard_flows_[id]->Pause(pause);
}
};
shard_set->pool()->AwaitBrief(cb);
}
if (shard_flows_.empty())
return;

auto cb = [&](unsigned index, auto*) {
for (auto id : thread_flow_map_[index]) {
shard_flows_[id]->Pause(pause);
}
};
shard_set->pool()->AwaitBrief(cb);
});
}

Expand Down Expand Up @@ -347,14 +347,15 @@ std::error_code Replica::HandleCapaDflyResp() {
}
master_context_.master_repl_id = master_repl_id;
master_context_.dfly_session_id = ToSV(LastResponseArgs()[1].GetBuf());
num_df_flows_ = param_num_flows;
master_context_.num_flows = param_num_flows;

if (LastResponseArgs().size() >= 4) {
PC_RETURN_ON_BAD_RESPONSE(LastResponseArgs()[3].type == RespExpr::INT64);
master_context_.version = DflyVersion(get<int64_t>(LastResponseArgs()[3].u));
}
VLOG(1) << "Master id: " << master_context_.master_repl_id
<< ", sync id: " << master_context_.dfly_session_id << ", num journals: " << num_df_flows_
<< ", sync id: " << master_context_.dfly_session_id
<< ", num journals: " << param_num_flows
<< ", version: " << unsigned(master_context_.version);

return error_code{};
Expand All @@ -369,12 +370,9 @@ std::error_code Replica::ConfigureDflyMaster() {
LOG(WARNING) << "Bad REPLCONF CLIENT-ID response";
}

// Tell the master our version if it supports REPLCONF CLIENT-VERSION
if (master_context_.version > DflyVersion::VER0) {
RETURN_ON_ERR(
SendCommandAndReadResponse(StrCat("REPLCONF CLIENT-VERSION ", DflyVersion::CURRENT_VER)));
PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK"));
}
RETURN_ON_ERR(
SendCommandAndReadResponse(StrCat("REPLCONF CLIENT-VERSION ", DflyVersion::CURRENT_VER)));
PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK"));

return error_code{};
}
Expand Down Expand Up @@ -476,14 +474,16 @@ error_code Replica::InitiateDflySync() {
multi_shard_exe_.reset(new MultiShardExecution());

// Initialize shard flows.
shard_flows_.resize(num_df_flows_);
for (unsigned i = 0; i < num_df_flows_; ++i) {
shard_flows_.resize(master_context_.num_flows);
DCHECK(!shard_flows_.empty());
for (unsigned i = 0; i < shard_flows_.size(); ++i) {
shard_flows_[i].reset(
new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_));
}
thread_flow_map_ = Partition(shard_flows_.size());

// Blocked on until all flows got full sync cut.
BlockingCounter sync_block{num_df_flows_};
BlockingCounter sync_block{unsigned(shard_flows_.size())};

// Switch to new error handler that closes flow sockets.
auto err_handler = [this, sync_block](const auto& ge) mutable {
Expand Down Expand Up @@ -516,12 +516,12 @@ error_code Replica::InitiateDflySync() {

std::string_view sync_type = "full";
{
unsigned num_df_flows = shard_flows_.size();
// Going out of the way to avoid using std::vector<bool>...
auto is_full_sync = std::make_unique<bool[]>(num_df_flows_);
auto partition = Partition(num_df_flows_);
CHECK(!last_journal_LSNs_ || last_journal_LSNs_->size() == shard_flows_.size());
auto is_full_sync = std::make_unique<bool[]>(num_df_flows);
DCHECK(!last_journal_LSNs_ || last_journal_LSNs_->size() == num_df_flows);
auto shard_cb = [&](unsigned index, auto*) {
for (auto id : partition[index]) {
for (auto id : thread_flow_map_[index]) {
auto ec = shard_flows_[id]->StartSyncFlow(sync_block, &cntx_,
last_journal_LSNs_.has_value()
? std::optional((*last_journal_LSNs_)[id])
Expand All @@ -538,9 +538,9 @@ error_code Replica::InitiateDflySync() {
shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb));

size_t num_full_flows =
std::accumulate(is_full_sync.get(), is_full_sync.get() + num_df_flows_, 0);
std::accumulate(is_full_sync.get(), is_full_sync.get() + num_df_flows, 0);

if (num_full_flows == num_df_flows_) {
if (num_full_flows == num_df_flows) {
if (slot_range_.has_value()) {
JournalExecutor{&service_}.FlushSlots(slot_range_.value());
} else {
Expand Down Expand Up @@ -668,9 +668,8 @@ error_code Replica::ConsumeDflyStream() {
LOG(INFO) << "Transitioned into stable sync";
// Transition flows into stable sync.
{
auto partition = Partition(num_df_flows_);
auto shard_cb = [&](unsigned index, auto*) {
const auto& local_ids = partition[index];
const auto& local_ids = thread_flow_map_[index];
for (unsigned id : local_ids) {
auto ec = shard_flows_[id]->StartStableSyncFlow(&cntx_);
if (ec)
Expand Down Expand Up @@ -723,6 +722,7 @@ io::Result<bool> DflyShardReplica::StartSyncFlow(BlockingCounter sb, Context* cn
std::optional<LSN> lsn) {
using nonstd::make_unexpected;
DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty());
proactor_index_ = ProactorBase::me()->GetPoolIndex();

RETURN_ON_ERR_T(make_unexpected,
ConnectAndAuth(absl::GetFlag(FLAGS_master_connect_timeout_ms) * 1ms, &cntx_));
Expand Down Expand Up @@ -826,14 +826,15 @@ void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc,
if (auto jo = rdb_loader_->journal_offset(); jo.has_value()) {
this->journal_rec_executed_.store(*jo);
} else {
if (master_context_.version > DflyVersion::VER0)
cntx->ReportError(std::make_error_code(errc::protocol_error),
"Error finding journal offset in stream");
cntx->ReportError(std::make_error_code(errc::protocol_error),
"Error finding journal offset in stream");
}
VLOG(1) << "FullSyncDflyFb finished after reading " << rdb_loader_->bytes_read() << " bytes";
}

void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) {
DCHECK_EQ(proactor_index_, ProactorBase::me()->GetPoolIndex());

// Check leftover from full sync.
io::Bytes prefix{};
if (leftover_buf_ && leftover_buf_->InputLen() > 0) {
Expand All @@ -846,15 +847,15 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) {
DCHECK_GE(journal_rec_executed_, 1u);
TransactionReader tx_reader{journal_rec_executed_.load(std::memory_order_relaxed) - 1};

if (master_context_.version > DflyVersion::VER0) {
acks_fb_ = fb2::Fiber("shard_acks", &DflyShardReplica::StableSyncDflyAcksFb, this, cntx);
}
acks_fb_ = fb2::Fiber("shard_acks", &DflyShardReplica::StableSyncDflyAcksFb, this, cntx);

while (!cntx->IsCancelled()) {
auto tx_data = tx_reader.NextTxData(&reader, cntx);
if (!tx_data)
break;

DVLOG(3) << "Lsn: " << tx_data->lsn;

last_io_time_ = Proactor()->GetMonotonicTimeNs();
if (tx_data->opcode == journal::Op::LSN) {
// Do nothing
Expand Down Expand Up @@ -895,6 +896,8 @@ void Replica::RedisStreamAcksFb() {
}

void DflyShardReplica::StableSyncDflyAcksFb(Context* cntx) {
DCHECK_EQ(proactor_index_, ProactorBase::me()->GetPoolIndex());

constexpr size_t kAckRecordMaxInterval = 1024;
std::chrono::duration ack_time_max_interval =
1ms * absl::GetFlag(FLAGS_replication_acks_interval);
Expand Down Expand Up @@ -1077,6 +1080,9 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d
auto Replica::GetSummary() const -> Summary {
auto f = [this]() {
auto last_io_time = LastIoTime();

// Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here
// it's unlikely to cause a real bug.
for (const auto& flow : shard_flows_) { // Get last io time from all sub flows.
last_io_time = std::max(last_io_time, flow->LastIoTime());
}
Expand All @@ -1095,18 +1101,17 @@ auto Replica::GetSummary() const -> Summary {

if (Sock())
return Proactor()->AwaitBrief(f);
else {
/**
* when this branch happens: there is a very short grace period
* where Sock() is not initialized, yet the server can
* receive ROLE/INFO commands. That period happens when launching
* an instance with '--replicaof' and then immediately
* sending a command.
*
* In that instance, we have to run f() on the current fiber.
*/
return f();
}

/**
* when this branch happens: there is a very short grace period
* where Sock() is not initialized, yet the server can
* receive ROLE/INFO commands. That period happens when launching
* an instance with '--replicaof' and then immediately
* sending a command.
*
* In that instance, we have to run f() on the current fiber.
*/
return f();
}

std::vector<uint64_t> Replica::GetReplicaOffset() const {
Expand All @@ -1129,10 +1134,6 @@ uint32_t DflyShardReplica::FlowId() const {
return flow_id_;
}

uint64_t DflyShardReplica::JournalExecutedCount() const {
return journal_rec_executed_.load(std::memory_order_relaxed);
}

void DflyShardReplica::Pause(bool pause) {
if (rdb_loader_) {
rdb_loader_->Pause(pause);
Expand Down
20 changes: 12 additions & 8 deletions src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class DflyShardReplica;
struct MasterContext {
std::string master_repl_id;
std::string dfly_session_id; // Sync session id for dfly sync.
DflyVersion version = DflyVersion::VER0;
unsigned num_flows = 0;
DflyVersion version = DflyVersion::VER1;
};

// This class manages replication from both Dragonfly and Redis masters.
Expand Down Expand Up @@ -122,7 +123,7 @@ class Replica : ProtocolClient {
uint32_t reconnect_count;
};

Summary GetSummary() const; // thread-safe, blocks fiber
Summary GetSummary() const; // thread-safe, blocks fiber, makes a hop.

bool HasDflyMaster() const {
return !master_context_.dfly_session_id.empty();
Expand All @@ -142,6 +143,8 @@ class Replica : ProtocolClient {
util::fb2::EventCount replica_waker_;

std::vector<std::unique_ptr<DflyShardReplica>> shard_flows_;
std::vector<std::vector<unsigned>> thread_flow_map_; // a map from proactor id to flow list.

// A vector of the last executer LSNs when a replication is interrupted.
// Allows partial sync on reconnects.
std::optional<std::vector<LSN>> last_journal_LSNs_;
Expand All @@ -154,7 +157,6 @@ class Replica : ProtocolClient {
// ack_offs_ last acknowledged offset.
size_t repl_offs_ = 0, ack_offs_ = 0;
std::atomic<unsigned> state_mask_ = 0;
unsigned num_df_flows_ = 0;

bool is_paused_ = false;
std::string id_;
Expand Down Expand Up @@ -196,8 +198,11 @@ class DflyShardReplica : public ProtocolClient {

uint32_t FlowId() const;

uint64_t JournalExecutedCount() const;
uint64_t JournalExecutedCount() const {
return journal_rec_executed_.load(std::memory_order_relaxed);
}

// Can be called from any thread.
void Pause(bool pause);

private:
Expand All @@ -218,13 +223,12 @@ class DflyShardReplica : public ProtocolClient {
// Note: This is not 1-to-1 the LSN in the master, because this counts
// **executed** records, which might be received interleaved when commands
// run out-of-order on the master instance.
// Atomic, because JournalExecutedCount() can be called from any thread.
std::atomic_uint64_t journal_rec_executed_ = 0;

util::fb2::Fiber sync_fb_;

util::fb2::Fiber acks_fb_;
util::fb2::Fiber sync_fb_, acks_fb_;
size_t ack_offs_ = 0;

int proactor_index_ = -1;
bool force_ping_ = false;

std::shared_ptr<MultiShardExecution> multi_shard_exe_;
Expand Down
Loading

0 comments on commit 0ee52c9

Please sign in to comment.