diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 9dec758ff90a..e42ac7b128a6 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -1025,18 +1025,16 @@ void ClusterFamily::BreakStalledFlowsInShard() { LOG(WARNING) << "Source node detected migration timeout for: " << om->GetMigrationInfo().ToString() << " last_write_ms: " << last_write_ns / 1000'000 << ", now: " << now / 1000'000; - // TODO add error string - om->Finish(true); - om->Start(); + om->Finish(true, "Detected migration timeout"); } } } void ClusterFamily::PauseMigration(bool pause) { util::fb2::LockGuard lk(migration_mu_); - CHECK(!outgoing_migration_jobs_.empty()); - for (auto& om : outgoing_migration_jobs_) { - om->Pause(pause); + CHECK(!incoming_migrations_jobs_.empty()); + for (auto& im : incoming_migrations_jobs_) { + im->Pause(pause); } } diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index 6cfb01f2c8f5..b42244ed9819 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -38,6 +38,10 @@ class ClusterShardMigration { bc_(bc) { } + void Pause(bool pause) { + pause_ = pause; + } + void Start(Context* cntx, util::FiberSocketBase* source) ABSL_LOCKS_EXCLUDED(mu_) { { util::fb2::LockGuard lk(mu_); @@ -56,6 +60,11 @@ class ClusterShardMigration { TransactionReader tx_reader; while (!cntx->IsCancelled()) { + if (pause_) { + ThisFiber::SleepFor(100ms); + continue; + } + auto tx_data = tx_reader.NextTxData(&reader, cntx); if (!tx_data) { in_migration_->ReportError(GenericError("No tx data")); @@ -135,6 +144,7 @@ class ClusterShardMigration { IncomingSlotMigration* in_migration_; util::fb2::BlockingCounter bc_; atomic_long last_attempt_{-1}; + atomic_bool pause_; }; IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, SlotRanges slots, @@ -153,6 +163,13 @@ IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, Slot IncomingSlotMigration::~IncomingSlotMigration() { } +void IncomingSlotMigration::Pause(bool pause) { + VLOG(1) << "Pausing migration"; + for (auto& flow : shard_flows_) { + flow->Pause(pause); + } +} + bool IncomingSlotMigration::Join(long attempt) { const absl::Time start = absl::Now(); const absl::Duration timeout = diff --git a/src/server/cluster/incoming_slot_migration.h b/src/server/cluster/incoming_slot_migration.h index 02ad3c2021c8..d8af4fce8e93 100644 --- a/src/server/cluster/incoming_slot_migration.h +++ b/src/server/cluster/incoming_slot_migration.h @@ -59,6 +59,8 @@ class IncomingSlotMigration { size_t GetKeyCount() const; + void Pause(bool pause); + private: std::string source_id_; Service& service_; diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 55957544d570..95953c9cedf6 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -86,10 +86,6 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { streamer_.SendFinalize(attempt); } - void Pause(bool pause) { - streamer_.Pause(pause); - } - const dfly::GenericError GetError() const { return cntx_.GetError(); } @@ -105,7 +101,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { int64_t OutgoingMigration::GetShardLastWriteTime() const { const auto* shard = EngineShard::tlocal(); CHECK(shard); - return slot_migrations_[shard->shard_id()]->GetLastWriteTime(); + const auto& migration = slot_migrations_[shard->shard_id()]; + return migration ? migration->GetLastWriteTime() : -1; } OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, ServerFamily* sf) @@ -145,9 +142,16 @@ void OutgoingMigration::OnAllShards( }); } -void OutgoingMigration::Finish(bool is_error) { - VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << " : " - << migration_info_.node_info.id; +void OutgoingMigration::Finish(bool is_error, std::string error) { + if (is_error) { + cntx_.ReportError(GenericError(std::move(error))); + VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << ": " + << migration_info_.node_info.id << "with error: " << error; + } else { + VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << ": " + << migration_info_.node_info.id; + } + bool should_cancel_flows = false; { @@ -177,14 +181,6 @@ void OutgoingMigration::Finish(bool is_error) { } } -void OutgoingMigration::Pause(bool pause) { - VLOG(1) << "Pausing migration"; - OnAllShards([pause](auto& migration) { - CHECK(migration != nullptr); - migration->Pause(pause); - }); -} - MigrationState OutgoingMigration::GetState() const { util::fb2::LockGuard lk(state_mu_); return state_; @@ -313,7 +309,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { bool is_block_active = true; auto is_pause_in_progress = [&is_block_active] { return is_block_active; }; auto pause_fb_opt = - dfly::Pause(server_family_->GetNonPriviligedListeners(), &namespaces.GetDefaultNamespace(), + dfly::Pause(server_family_->GetNonPriviligedListeners(), &namespaces->GetDefaultNamespace(), nullptr, ClientPause::WRITE, is_pause_in_progress); if (!pause_fb_opt) { diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index ae744e09fd94..eab9166f85a1 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -30,11 +30,10 @@ class OutgoingMigration : private ProtocolClient { // start migration process, sends INIT command to the target node void Start(); - // mark migration as FINISHED and cancel migration if it's not finished yet + // if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet // can be called from any thread, but only after Start() - void Finish(bool is_error = false) ABSL_LOCKS_EXCLUDED(state_mu_); - - void Pause(bool pause); + // if is_error = true and migration is in progress it will be restarted otherwise nothing happens + void Finish(bool is_error = false, std::string error = "") ABSL_LOCKS_EXCLUDED(state_mu_); MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_); diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index b3da7e9660ae..0e2c5559b83c 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -120,6 +120,7 @@ void JournalStreamer::Write(std::string_view str) { } v[next_buf_id++] = IoVec(io::Bytes(buf, str.size())); + last_write_time_ns_ = absl::GetCurrentTimeNanos(); dest_->AsyncWrite( v, next_buf_id, [buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) { @@ -129,6 +130,7 @@ void JournalStreamer::Write(std::string_view str) { } void JournalStreamer::OnCompletion(std::error_code ec, size_t len) { + last_write_time_ns_ = -1; DCHECK_GE(in_flight_bytes_, len); DVLOG(2) << "Completing from " << in_flight_bytes_ << " to " << in_flight_bytes_ - len; @@ -139,6 +141,7 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) { // If everything was sent but we have a pending buf, flush it. io::Bytes src(pending_buf_); in_flight_bytes_ += src.size(); + last_write_time_ns_ = absl::GetCurrentTimeNanos(); dest_->AsyncWrite(src, [buf = std::move(pending_buf_), this](std::error_code ec) { OnCompletion(ec, buf.size()); }); @@ -210,10 +213,6 @@ void RestoreStreamer::Run() { PrimeTable* pt = &db_array_[0]->prime; do { - if (pause_) { - ThisFiber::SleepFor(100ms); - continue; - } if (fiber_cancelled_) return; @@ -352,7 +351,6 @@ void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const Pr // TODO: From DumpObject to till Write we tripple copy the PrimeValue. It's very inefficient and // will burn CPU for large values. Write(restore_cmd_sink.str()); - last_write_time_ns_ = absl::GetCurrentTimeNanos(); } } // namespace dfly diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 20a54dc98815..40c89e8ab90a 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -32,6 +32,10 @@ class JournalStreamer { size_t GetTotalBufferCapacities() const; + int64_t GetLastWriteTime() const { + return last_write_time_ns_; + } + protected: // TODO: we copy the string on each write because JournalItem may be passed to multiple // streamers so we can not move it. However, if we would either wrap JournalItem in shared_ptr @@ -68,6 +72,7 @@ class JournalStreamer { time_t last_lsn_time_ = 0; util::fb2::EventCount waker_; uint32_t journal_cb_id_{0}; + int64_t last_write_time_ns_ = -1; // last write call. }; // Serializes existing DB as RESTORE commands, and sends updates as regular commands. @@ -90,14 +95,6 @@ class RestoreStreamer : public JournalStreamer { return snapshot_finished_; } - int64_t GetLastWriteTime() const { - return last_write_time_ns_; - } - - void Pause(bool pause) { - pause_ = pause; - } - private: void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); bool ShouldWrite(const journal::JournalItem& item) const override; @@ -114,8 +111,6 @@ class RestoreStreamer : public JournalStreamer { cluster::SlotSet my_slots_; bool fiber_cancelled_ = false; bool snapshot_finished_ = false; - bool pause_ = false; - int64_t last_write_time_ns_ = -1; // last write call. }; } // namespace dfly diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 6fa009ef0fad..22723a30f4ea 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -809,6 +809,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector config_registry.RegisterMutable("replica_partial_sync"); config_registry.RegisterMutable("replication_timeout"); + config_registry.RegisterMutable("migration_timeout"); config_registry.RegisterMutable("table_growth_margin"); config_registry.RegisterMutable("tcp_keepalive"); @@ -864,6 +865,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector // Initialize shard_set with a global callback running once in a while in the shard threads. shard_set->Init(shard_num, [this] { server_family_.GetDflyCmd()->BreakStalledFlowsInShard(); + cluster_family_.BreakStalledFlowsInShard(); server_family_.UpdateMemoryGlobalStats(); }); Transaction::Init(shard_num); diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 3a19cfd022d6..4046201f53ac 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -2050,3 +2050,56 @@ async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFact await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) await check_for_no_state_status([node.admin_client for node in nodes]) + + +@pytest.mark.asyncio +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_seeder_factory): + # setting migration_timeout to a very small value to force the replica to timeout + instances = [ + df_factory.create( + port=BASE_PORT + i, + admin_port=BASE_PORT + i + 1000, + migration_timeout=100, + vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9", + ) + for i in range(2) + ] + + df_factory.start_all(instances) + + nodes = [(await create_node_info(instance)) for instance in instances] + nodes[0].slots = [(0, 16383)] + nodes[1].slots = [] + + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + logging.debug("source node DEBUG POPULATE") + await nodes[0].client.execute_command("debug", "populate", "200000", "foo", "1000") + + seeder = df_seeder_factory.create(port=nodes[0].instance.port, cluster_mode=True) + seeder_task = asyncio.create_task(seeder.run(target_deviation=0.1)) + + await asyncio.sleep(0.5) # wait for seeder running + + logging.debug("Start migration") + nodes[0].migrations.append( + MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id) + ) + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + await wait_for_status(nodes[0].admin_client, nodes[1].id, "SYNC") + + logging.debug("debug migration pause") + await nodes[1].client.execute_command("debug migration pause") + + await asyncio.sleep(1) + + logging.debug("debug migration resume") + await nodes[1].client.execute_command("debug migration resume") + + await asyncio.sleep(1) # migration will start resync + seeder.stop() + await seeder_task + + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED")