Skip to content

Commit

Permalink
test: add test_migration_timeout_on_sync
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Nov 12, 2024
1 parent 0467c37 commit 1b355a3
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 42 deletions.
10 changes: 4 additions & 6 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1025,18 +1025,16 @@ void ClusterFamily::BreakStalledFlowsInShard() {
LOG(WARNING) << "Source node detected migration timeout for: "
<< om->GetMigrationInfo().ToString()
<< " last_write_ms: " << last_write_ns / 1000'000 << ", now: " << now / 1000'000;
// TODO add error string
om->Finish(true);
om->Start();
om->Finish(true, "Detected migration timeout");
}
}
}

void ClusterFamily::PauseMigration(bool pause) {
util::fb2::LockGuard lk(migration_mu_);
CHECK(!outgoing_migration_jobs_.empty());
for (auto& om : outgoing_migration_jobs_) {
om->Pause(pause);
CHECK(!incoming_migrations_jobs_.empty());
for (auto& im : incoming_migrations_jobs_) {
im->Pause(pause);
}
}

Expand Down
17 changes: 17 additions & 0 deletions src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class ClusterShardMigration {
bc_(bc) {
}

void Pause(bool pause) {
pause_ = pause;
}

void Start(Context* cntx, util::FiberSocketBase* source) ABSL_LOCKS_EXCLUDED(mu_) {
{
util::fb2::LockGuard lk(mu_);
Expand All @@ -56,6 +60,11 @@ class ClusterShardMigration {
TransactionReader tx_reader;

while (!cntx->IsCancelled()) {
if (pause_) {
ThisFiber::SleepFor(100ms);
continue;
}

auto tx_data = tx_reader.NextTxData(&reader, cntx);
if (!tx_data) {
in_migration_->ReportError(GenericError("No tx data"));
Expand Down Expand Up @@ -135,6 +144,7 @@ class ClusterShardMigration {
IncomingSlotMigration* in_migration_;
util::fb2::BlockingCounter bc_;
atomic_long last_attempt_{-1};
atomic_bool pause_;
};

IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, SlotRanges slots,
Expand All @@ -153,6 +163,13 @@ IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, Slot
IncomingSlotMigration::~IncomingSlotMigration() {
}

void IncomingSlotMigration::Pause(bool pause) {
VLOG(1) << "Pausing migration";
for (auto& flow : shard_flows_) {
flow->Pause(pause);
}
}

bool IncomingSlotMigration::Join(long attempt) {
const absl::Time start = absl::Now();
const absl::Duration timeout =
Expand Down
2 changes: 2 additions & 0 deletions src/server/cluster/incoming_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class IncomingSlotMigration {

size_t GetKeyCount() const;

void Pause(bool pause);

private:
std::string source_id_;
Service& service_;
Expand Down
30 changes: 13 additions & 17 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
streamer_.SendFinalize(attempt);
}

void Pause(bool pause) {
streamer_.Pause(pause);
}

const dfly::GenericError GetError() const {
return cntx_.GetError();
}
Expand All @@ -105,7 +101,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
int64_t OutgoingMigration::GetShardLastWriteTime() const {
const auto* shard = EngineShard::tlocal();
CHECK(shard);
return slot_migrations_[shard->shard_id()]->GetLastWriteTime();
const auto& migration = slot_migrations_[shard->shard_id()];
return migration ? migration->GetLastWriteTime() : -1;
}

OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, ServerFamily* sf)
Expand Down Expand Up @@ -145,9 +142,16 @@ void OutgoingMigration::OnAllShards(
});
}

void OutgoingMigration::Finish(bool is_error) {
VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << " : "
<< migration_info_.node_info.id;
void OutgoingMigration::Finish(bool is_error, std::string error) {
if (is_error) {
cntx_.ReportError(GenericError(std::move(error)));
VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id << "with error: " << error;
} else {
VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id;
}

bool should_cancel_flows = false;

{
Expand Down Expand Up @@ -177,14 +181,6 @@ void OutgoingMigration::Finish(bool is_error) {
}
}

void OutgoingMigration::Pause(bool pause) {
VLOG(1) << "Pausing migration";
OnAllShards([pause](auto& migration) {
CHECK(migration != nullptr);
migration->Pause(pause);
});
}

MigrationState OutgoingMigration::GetState() const {
util::fb2::LockGuard lk(state_mu_);
return state_;
Expand Down Expand Up @@ -313,7 +309,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
bool is_block_active = true;
auto is_pause_in_progress = [&is_block_active] { return is_block_active; };
auto pause_fb_opt =
dfly::Pause(server_family_->GetNonPriviligedListeners(), &namespaces.GetDefaultNamespace(),
dfly::Pause(server_family_->GetNonPriviligedListeners(), &namespaces->GetDefaultNamespace(),
nullptr, ClientPause::WRITE, is_pause_in_progress);

if (!pause_fb_opt) {
Expand Down
7 changes: 3 additions & 4 deletions src/server/cluster/outgoing_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ class OutgoingMigration : private ProtocolClient {
// start migration process, sends INIT command to the target node
void Start();

// mark migration as FINISHED and cancel migration if it's not finished yet
// if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet
// can be called from any thread, but only after Start()
void Finish(bool is_error = false) ABSL_LOCKS_EXCLUDED(state_mu_);

void Pause(bool pause);
// if is_error = true and migration is in progress it will be restarted otherwise nothing happens
void Finish(bool is_error = false, std::string error = "") ABSL_LOCKS_EXCLUDED(state_mu_);

MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_);

Expand Down
8 changes: 3 additions & 5 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ void JournalStreamer::Write(std::string_view str) {
}
v[next_buf_id++] = IoVec(io::Bytes(buf, str.size()));

last_write_time_ns_ = absl::GetCurrentTimeNanos();
dest_->AsyncWrite(
v, next_buf_id,
[buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) {
Expand All @@ -129,6 +130,7 @@ void JournalStreamer::Write(std::string_view str) {
}

void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
last_write_time_ns_ = -1;
DCHECK_GE(in_flight_bytes_, len);

DVLOG(2) << "Completing from " << in_flight_bytes_ << " to " << in_flight_bytes_ - len;
Expand All @@ -139,6 +141,7 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
// If everything was sent but we have a pending buf, flush it.
io::Bytes src(pending_buf_);
in_flight_bytes_ += src.size();
last_write_time_ns_ = absl::GetCurrentTimeNanos();
dest_->AsyncWrite(src, [buf = std::move(pending_buf_), this](std::error_code ec) {
OnCompletion(ec, buf.size());
});
Expand Down Expand Up @@ -210,10 +213,6 @@ void RestoreStreamer::Run() {
PrimeTable* pt = &db_array_[0]->prime;

do {
if (pause_) {
ThisFiber::SleepFor(100ms);
continue;
}
if (fiber_cancelled_)
return;

Expand Down Expand Up @@ -352,7 +351,6 @@ void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const Pr
// TODO: From DumpObject to till Write we tripple copy the PrimeValue. It's very inefficient and
// will burn CPU for large values.
Write(restore_cmd_sink.str());
last_write_time_ns_ = absl::GetCurrentTimeNanos();
}

} // namespace dfly
15 changes: 5 additions & 10 deletions src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class JournalStreamer {

size_t GetTotalBufferCapacities() const;

int64_t GetLastWriteTime() const {
return last_write_time_ns_;
}

protected:
// TODO: we copy the string on each write because JournalItem may be passed to multiple
// streamers so we can not move it. However, if we would either wrap JournalItem in shared_ptr
Expand Down Expand Up @@ -68,6 +72,7 @@ class JournalStreamer {
time_t last_lsn_time_ = 0;
util::fb2::EventCount waker_;
uint32_t journal_cb_id_{0};
int64_t last_write_time_ns_ = -1; // last write call.
};

// Serializes existing DB as RESTORE commands, and sends updates as regular commands.
Expand All @@ -90,14 +95,6 @@ class RestoreStreamer : public JournalStreamer {
return snapshot_finished_;
}

int64_t GetLastWriteTime() const {
return last_write_time_ns_;
}

void Pause(bool pause) {
pause_ = pause;
}

private:
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
bool ShouldWrite(const journal::JournalItem& item) const override;
Expand All @@ -114,8 +111,6 @@ class RestoreStreamer : public JournalStreamer {
cluster::SlotSet my_slots_;
bool fiber_cancelled_ = false;
bool snapshot_finished_ = false;
bool pause_ = false;
int64_t last_write_time_ns_ = -1; // last write call.
};

} // namespace dfly
2 changes: 2 additions & 0 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>

config_registry.RegisterMutable("replica_partial_sync");
config_registry.RegisterMutable("replication_timeout");
config_registry.RegisterMutable("migration_timeout");
config_registry.RegisterMutable("table_growth_margin");
config_registry.RegisterMutable("tcp_keepalive");

Expand Down Expand Up @@ -864,6 +865,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
// Initialize shard_set with a global callback running once in a while in the shard threads.
shard_set->Init(shard_num, [this] {
server_family_.GetDflyCmd()->BreakStalledFlowsInShard();
cluster_family_.BreakStalledFlowsInShard();
server_family_.UpdateMemoryGlobalStats();
});
Transaction::Init(shard_num);
Expand Down
53 changes: 53 additions & 0 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2050,3 +2050,56 @@ async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFact
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

await check_for_no_state_status([node.admin_client for node in nodes])


@pytest.mark.asyncio
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_seeder_factory):
# setting migration_timeout to a very small value to force the replica to timeout
instances = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
migration_timeout=100,
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
)
for i in range(2)
]

df_factory.start_all(instances)

nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
nodes[1].slots = []

await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

logging.debug("source node DEBUG POPULATE")
await nodes[0].client.execute_command("debug", "populate", "200000", "foo", "1000")

seeder = df_seeder_factory.create(port=nodes[0].instance.port, cluster_mode=True)
seeder_task = asyncio.create_task(seeder.run(target_deviation=0.1))

await asyncio.sleep(0.5) # wait for seeder running

logging.debug("Start migration")
nodes[0].migrations.append(
MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id)
)
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

await wait_for_status(nodes[0].admin_client, nodes[1].id, "SYNC")

logging.debug("debug migration pause")
await nodes[1].client.execute_command("debug migration pause")

await asyncio.sleep(1)

logging.debug("debug migration resume")
await nodes[1].client.execute_command("debug migration resume")

await asyncio.sleep(1) # migration will start resync
seeder.stop()
await seeder_task

await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED")

0 comments on commit 1b355a3

Please sign in to comment.