From 84a75ffbc0a30d33ffc8d9add761277121de72e9 Mon Sep 17 00:00:00 2001 From: adi_holden Date: Wed, 22 Jan 2025 12:19:05 +0200 Subject: [PATCH] fix cluster: crash in cluster migration Signed-off-by: adi_holden --- src/core/dash.h | 7 +++ src/server/journal/streamer.cc | 4 +- src/server/snapshot.cc | 3 +- tests/dragonfly/cluster_test.py | 77 +++++++++++++++++++++++++++++++++ 4 files changed, 87 insertions(+), 4 deletions(-) diff --git a/src/core/dash.h b/src/core/dash.h index 05a99560f21a..de7a025d44d3 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -409,6 +409,13 @@ class DashTable<_Key, _Value, Policy>::Iterator { return *this; } + Iterator& AdvanceIfNotOccupied() { + if (!IsOccupied()) { + this->operator++(); + } + return *this; + } + IteratorPairType operator->() const { auto* seg = owner_->segment_[seg_id_]; return {seg->Key(bucket_id_, slot_id_), seg->Value(bucket_id_, slot_id_)}; diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 7cc3e038e2e1..9dd2d9472d6c 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -231,7 +231,7 @@ void RestoreStreamer::Run() { ThisFiber::Yield(); last_yield = 0; } - } while (cursor); + } while (cursor && !fiber_cancelled_); VLOG(1) << "RestoreStreamer finished loop of " << my_slots_.ToSlotRanges().ToString() << ", shard " << db_slice_->shard_id() << ". Buckets looped " << stats_.buckets_loop; @@ -302,7 +302,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { it.SetVersion(snapshot_version_); string key_buffer; // we can reuse it - for (; !it.is_done(); ++it) { + for (it.AdvanceIfNotOccupied(); !it.is_done(); ++it) { const auto& pv = it->second; string_view key = it->first.GetSlice(&key_buffer); if (ShouldWrite(key)) { diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 4c2abbe51a23..c6c64261abb9 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -291,11 +291,10 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite it.SetVersion(snapshot_version_); unsigned result = 0; - while (!it.is_done()) { + for (it.AdvanceIfNotOccupied(); !it.is_done(); ++it) { ++result; // might preempt due to big value serialization. SerializeEntry(db_index, it->first, it->second); - ++it; } serialize_bucket_running_ = false; return result; diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 9f0cbbd46ec2..76de4918364a 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -2685,3 +2685,80 @@ async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_see await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) assert (await StaticSeeder.capture(nodes[1].client)) == start_capture + + +""" +Test cluster node distributing its slots into 2 other nodes. +In this test we start migrating to the second node only after the first one finished to +reproduce the bug found in issue #4455 +""" + + +@pytest.mark.asyncio +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +async def test_migration_one_after_another(df_factory: DflyInstanceFactory, df_seeder_factory): + # 1. Create cluster of 3 nodes with all slots allocated to first node. + instances = [ + df_factory.create( + port=next(next_port), + admin_port=next(next_port), + vmodule="outgoing_slot_migration=2,cluster_family=2,incoming_slot_migration=2,streamer=2", + ) + for i in range(3) + ] + df_factory.start_all(instances) + + nodes = [(await create_node_info(instance)) for instance in instances] + nodes[0].slots = [(0, 16383)] + nodes[1].slots = [] + nodes[2].slots = [] + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + logging.debug("DEBUG POPULATE first node") + key_num = 100000 + await StaticSeeder(key_target=key_num, data_size=100).run(nodes[0].client) + dbsize_node0 = await nodes[0].client.dbsize() + assert dbsize_node0 > (key_num * 0.95) + + # 2. Start migrating part of the slots from first node to second + logging.debug("Start first migration") + nodes[0].migrations.append( + MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16300)], nodes[1].id) + ) + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + # 3. Wait for migratin finish + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", timeout=50) + await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED", timeout=50) + + nodes[0].migrations = [] + nodes[0].slots = [(16301, 16383)] + nodes[1].slots = [(0, 16300)] + nodes[2].slots = [] + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + # 4. Start migrating remaind slots from first node to third node + logging.debug("Start second migration") + nodes[0].migrations.append( + MigrationInfo("127.0.0.1", nodes[2].instance.admin_port, [(16301, 16383)], nodes[2].id) + ) + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + # 5. Wait for migratin finish + await wait_for_status(nodes[0].admin_client, nodes[2].id, "FINISHED", timeout=10) + await wait_for_status(nodes[2].admin_client, nodes[0].id, "FINISHED", timeout=10) + + nodes[0].migrations = [] + nodes[0].slots = [] + nodes[1].slots = [(0, 16300)] + nodes[2].slots = [(16301, 16383)] + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + # 6. Check all data was migrated + # Using dbsize to check all the data was migrated to the other nodes. + # Note: we can not use the seeder capture as we migrate the data to 2 different nodes. + # TODO: improve the migration conrrectness by running the seeder capture on slot range (requiers changes in capture script). + dbsize_node1 = await nodes[1].client.dbsize() + dbsize_node2 = await nodes[2].client.dbsize() + assert dbsize_node1 + dbsize_node2 == dbsize_node0 + assert dbsize_node2 > 0 and dbsize_node1 > 0