Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cluster): crash in cluster migration #4495

Merged
merged 1 commit into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/core/dash.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,13 @@ class DashTable<_Key, _Value, Policy>::Iterator {
return *this;
}

Iterator& AdvanceIfNotOccupied() {
if (!IsOccupied()) {
this->operator++();
}
return *this;
}

IteratorPairType operator->() const {
auto* seg = owner_->segment_[seg_id_];
return {seg->Key(bucket_id_, slot_id_), seg->Value(bucket_id_, slot_id_)};
Expand Down
4 changes: 2 additions & 2 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ void RestoreStreamer::Run() {
ThisFiber::Yield();
last_yield = 0;
}
} while (cursor);
} while (cursor && !fiber_cancelled_);

VLOG(1) << "RestoreStreamer finished loop of " << my_slots_.ToSlotRanges().ToString()
<< ", shard " << db_slice_->shard_id() << ". Buckets looped " << stats_.buckets_loop;
Expand Down Expand Up @@ -302,7 +302,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {

it.SetVersion(snapshot_version_);
string key_buffer; // we can reuse it
for (; !it.is_done(); ++it) {
for (it.AdvanceIfNotOccupied(); !it.is_done(); ++it) {
const auto& pv = it->second;
string_view key = it->first.GetSlice(&key_buffer);
if (ShouldWrite(key)) {
Expand Down
3 changes: 1 addition & 2 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,10 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
it.SetVersion(snapshot_version_);
unsigned result = 0;

while (!it.is_done()) {
for (it.AdvanceIfNotOccupied(); !it.is_done(); ++it) {
++result;
// might preempt due to big value serialization.
SerializeEntry(db_index, it->first, it->second);
++it;
}
serialize_bucket_running_ = false;
return result;
Expand Down
77 changes: 77 additions & 0 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2685,3 +2685,80 @@ async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_see
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

assert (await StaticSeeder.capture(nodes[1].client)) == start_capture


"""
Test cluster node distributing its slots into 2 other nodes.
In this test we start migrating to the second node only after the first one finished to
reproduce the bug found in issue #4455
"""


@pytest.mark.asyncio
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_migration_one_after_another(df_factory: DflyInstanceFactory, df_seeder_factory):
# 1. Create cluster of 3 nodes with all slots allocated to first node.
instances = [
df_factory.create(
port=next(next_port),
admin_port=next(next_port),
vmodule="outgoing_slot_migration=2,cluster_family=2,incoming_slot_migration=2,streamer=2",
)
for i in range(3)
]
df_factory.start_all(instances)

nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
nodes[1].slots = []
nodes[2].slots = []
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

logging.debug("DEBUG POPULATE first node")
key_num = 100000
await StaticSeeder(key_target=key_num, data_size=100).run(nodes[0].client)
dbsize_node0 = await nodes[0].client.dbsize()
assert dbsize_node0 > (key_num * 0.95)

# 2. Start migrating part of the slots from first node to second
logging.debug("Start first migration")
nodes[0].migrations.append(
MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16300)], nodes[1].id)
)
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

# 3. Wait for migratin finish
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", timeout=50)
await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED", timeout=50)

nodes[0].migrations = []
nodes[0].slots = [(16301, 16383)]
nodes[1].slots = [(0, 16300)]
nodes[2].slots = []
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

# 4. Start migrating remaind slots from first node to third node
logging.debug("Start second migration")
nodes[0].migrations.append(
MigrationInfo("127.0.0.1", nodes[2].instance.admin_port, [(16301, 16383)], nodes[2].id)
)
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

# 5. Wait for migratin finish
await wait_for_status(nodes[0].admin_client, nodes[2].id, "FINISHED", timeout=10)
await wait_for_status(nodes[2].admin_client, nodes[0].id, "FINISHED", timeout=10)

nodes[0].migrations = []
nodes[0].slots = []
nodes[1].slots = [(0, 16300)]
nodes[2].slots = [(16301, 16383)]
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

# 6. Check all data was migrated
# Using dbsize to check all the data was migrated to the other nodes.
# Note: we can not use the seeder capture as we migrate the data to 2 different nodes.
# TODO: improve the migration conrrectness by running the seeder capture on slot range (requiers changes in capture script).
dbsize_node1 = await nodes[1].client.dbsize()
dbsize_node2 = await nodes[2].client.dbsize()
assert dbsize_node1 + dbsize_node2 == dbsize_node0
assert dbsize_node2 > 0 and dbsize_node1 > 0
Loading