From 5b40be7079475cdb2b3cf45ef7180315ff495ef7 Mon Sep 17 00:00:00 2001 From: barshaul Date: Thu, 14 Nov 2024 09:07:01 +0000 Subject: [PATCH] Added missing tests Signed-off-by: barshaul --- .../redis/tests/test_cluster_async.rs | 439 +++++++++++++++++- 1 file changed, 436 insertions(+), 3 deletions(-) diff --git a/glide-core/redis-rs/redis/tests/test_cluster_async.rs b/glide-core/redis-rs/redis/tests/test_cluster_async.rs index 604638365d..968451a1fe 100644 --- a/glide-core/redis-rs/redis/tests/test_cluster_async.rs +++ b/glide-core/redis-rs/redis/tests/test_cluster_async.rs @@ -1412,8 +1412,8 @@ mod cluster_async { .. } = MockEnv::with_client_builder( ClusterClient::builder(vec![&*format!("redis://{name}")]) - .slots_refresh_rate_limit(Duration::from_secs(1000000), 0) // Rate limiter to disable slot refresh - .read_from_replicas(), // Allow reads from replicas + .slots_refresh_rate_limit(Duration::from_secs(1000000), 0) // Rate limiter to disable slot refresh + .read_from_replicas(), // Allow reads from replicas name, move |cmd: &[u8], port| { if contains_slice(cmd, b"PING") @@ -1440,7 +1440,7 @@ mod cluster_async { )) } else { panic!("unexpected port for SET command: {port:?}.\n - Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); + Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); } } else if contains_slice(cmd, b"GET") { if new_shard_replica_port == port { @@ -1487,6 +1487,439 @@ mod cluster_async { assert_eq!(cloned_moved_requests.load(Ordering::Relaxed), 1); } + #[test] + fn test_async_cluster_update_slots_based_on_moved_error_indicates_failover() { + // This test simulates a failover scenario, where the client receives a MOVED error and the replica becomes the new primary. + // The test verifies that the client updates the slot mapping to promote the replica to the primary and routes future requests + // to the new primary, ensuring other slots in the shard are also handled by the new primary. + let name = "test_async_cluster_update_slots_based_on_moved_error_indicates_failover"; + let slots_config = vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![7001], + slot_range: (0..8000), + }, + MockSlotRange { + primary_port: 6380, + replica_ports: vec![7002], + slot_range: (8001..16380), + }, + ]; + + let moved_from_port = 6379; + let moved_to_port = 7001; + + // Tracking moved for validation + let moved_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_moved_requests = moved_requests.clone(); + + // Test key and slot + let key = "test"; + let key_slot = 6918; + + // Mock environment setup + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .slots_refresh_rate_limit(Duration::from_secs(1000000), 0), // Rate limiter to disable slot refresh + name, + move |cmd: &[u8], port| { + if contains_slice(cmd, b"PING") + || contains_slice(cmd, b"SETNAME") + || contains_slice(cmd, b"READONLY") + { + return Err(Ok(Value::SimpleString("OK".into()))); + } + + if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + let slots = create_topology_from_config(name, slots_config.clone()); + return Err(Ok(slots)); + } + + if contains_slice(cmd, b"SET") { + if port == moved_to_port { + // Simulate primary OK response + Err(Ok(Value::SimpleString("OK".into()))) + } else if port == moved_from_port { + // Simulate MOVED error for other port + moved_requests.fetch_add(1, Ordering::Relaxed); + Err(parse_redis_value( + format!("-MOVED {key_slot} {name}:{moved_to_port}\r\n").as_bytes(), + )) + } else { + panic!("unexpected port for SET command: {port:?}.\n + Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); + } + } else { + panic!("unexpected command {cmd:?}") + } + }, + ); + + // First request: Trigger MOVED error and reroute + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Second request: Should be routed directly to the new primary node if the slots map is updated + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Handle failover scenario: Ensure other slots in the same shard are updated to the new primary + let key_slot_1044 = "foo2"; + let value = runtime.block_on( + cmd("SET") + .arg(key_slot_1044) + .arg("bar2") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Assert there was only a single MOVED error + assert_eq!(cloned_moved_requests.load(Ordering::Relaxed), 1); + } + + #[test] + fn test_async_cluster_update_slots_based_on_moved_error_indicates_new_primary() { + // This test simulates the scenario where the client receives a MOVED error indicating that the key now belongs to + // an entirely new primary node that wasn't previously known. The test verifies that the client correctly adds the new + // primary node to its slot map and routes future requests to the new node. + let name = "test_async_cluster_update_slots_based_on_moved_error_indicates_new_primary"; + let slots_config = vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![], + slot_range: (0..8000), + }, + MockSlotRange { + primary_port: 6380, + replica_ports: vec![], + slot_range: (8001..16380), + }, + ]; + + let moved_from_port = 6379; + let moved_to_port = 6381; + + // Tracking moved for validation + let moved_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_moved_requests = moved_requests.clone(); + + // Test key and slot + let key = "test"; + let key_slot = 6918; + + // Mock environment setup + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .slots_refresh_rate_limit(Duration::from_secs(1000000), 0) // Rate limiter to disable slot refresh + .read_from_replicas(), // Allow reads from replicas + name, + move |cmd: &[u8], port| { + if contains_slice(cmd, b"PING") + || contains_slice(cmd, b"SETNAME") + || contains_slice(cmd, b"READONLY") + { + return Err(Ok(Value::SimpleString("OK".into()))); + } + + if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + let slots = create_topology_from_config(name, slots_config.clone()); + return Err(Ok(slots)); + } + + if contains_slice(cmd, b"SET") { + if port == moved_to_port { + // Simulate primary OK response + Err(Ok(Value::SimpleString("OK".into()))) + } else if port == moved_from_port { + // Simulate MOVED error for other port + moved_requests.fetch_add(1, Ordering::Relaxed); + Err(parse_redis_value( + format!("-MOVED {key_slot} {name}:{moved_to_port}\r\n").as_bytes(), + )) + } else { + panic!("unexpected port for SET command: {port:?}.\n + Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); + } + } else if contains_slice(cmd, b"GET") { + if moved_to_port == port { + // Simulate primary response for GET + Err(Ok(Value::BulkString(b"123".to_vec()))) + } else { + panic!( + "unexpected port for GET command: {port:?}, Expected: {moved_to_port}" + ); + } + } else { + panic!("unexpected command {cmd:?}") + } + }, + ); + + // First request: Trigger MOVED error and reroute + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Second request: Should be routed directly to the new primary node if the slots map is updated + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Third request: The new primary should have no replicas so it should be directed to it + let value = runtime.block_on( + cmd("GET") + .arg(key) + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(123))); + + // Assert there was only a single MOVED error + assert_eq!(cloned_moved_requests.load(Ordering::Relaxed), 1); + } + + #[test] + fn test_async_cluster_update_slots_based_on_moved_error_indicates_replica_of_different_shard() { + // This test simulates a scenario where the client receives a MOVED error indicating that a key + // has been moved to a replica in a different shard. The replica is then promoted to primary and + // no longer exists in the shard’s replica set. + // The test validates that the key gets correctly routed to the new primary and ensures that the + // shard updates its mapping accordingly, with only one MOVED error encountered during the process. + + let name = "test_async_cluster_update_slots_based_on_moved_error_indicates_replica_of_different_shard"; + let slots_config = vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![7000], + slot_range: (0..8000), + }, + MockSlotRange { + primary_port: 6380, + replica_ports: vec![7001], + slot_range: (8001..16380), + }, + ]; + + let moved_from_port = 6379; + let moved_to_port = 7001; + let primary_shard2 = 6380; + + // Tracking moved for validation + let moved_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_moved_requests = moved_requests.clone(); + + // Test key and slot of the first shard + let key = "test"; + let key_slot = 6918; + + // Test key of the second shard + let key_shard2 = "foo"; // slot 12182 + + // Mock environment setup + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .slots_refresh_rate_limit(Duration::from_secs(1000000), 0) // Rate limiter to disable slot refresh + .read_from_replicas(), // Allow reads from replicas + name, + move |cmd: &[u8], port| { + if contains_slice(cmd, b"PING") + || contains_slice(cmd, b"SETNAME") + || contains_slice(cmd, b"READONLY") + { + return Err(Ok(Value::SimpleString("OK".into()))); + } + + if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + let slots = create_topology_from_config(name, slots_config.clone()); + return Err(Ok(slots)); + } + + if contains_slice(cmd, b"SET") { + if port == moved_to_port { + // Simulate primary OK response + Err(Ok(Value::SimpleString("OK".into()))) + } else if port == moved_from_port { + // Simulate MOVED error for other port + moved_requests.fetch_add(1, Ordering::Relaxed); + Err(parse_redis_value( + format!("-MOVED {key_slot} {name}:{moved_to_port}\r\n").as_bytes(), + )) + } else { + panic!("unexpected port for SET command: {port:?}.\n + Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); + } + } else if contains_slice(cmd, b"GET") { + if port == primary_shard2 { + // Simulate second shard primary response for GET + Err(Ok(Value::BulkString(b"123".to_vec()))) + } else { + panic!("unexpected port for GET command: {port:?}, Expected: {primary_shard2:?}"); + } + } else { + panic!("unexpected command {cmd:?}") + } + }, + ); + + // First request: Trigger MOVED error and reroute + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Second request: Should be routed directly to the new primary node if the slots map is updated + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Third request: Verify that the promoted replica is no longer part of the second shard replicas by + // ensuring the response is received from the shard's primary + let value = runtime.block_on( + cmd("GET") + .arg(key_shard2) + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(123))); + + // Assert there was only a single MOVED error + assert_eq!(cloned_moved_requests.load(Ordering::Relaxed), 1); + } + + #[test] + fn test_async_cluster_update_slots_based_on_moved_error_no_change() { + // This test simulates a scenario where the client receives a MOVED error, but the new primary is the + // same as the old primary (no actual change). It ensures that no additional slot map + // updates are required and that the subsequent requests are still routed to the same primary node, with + // only one MOVED error encountered. + let name = "test_async_cluster_update_slots_based_on_moved_error_no_change"; + let slots_config = vec![ + MockSlotRange { + primary_port: 6379, + replica_ports: vec![7000], + slot_range: (0..8000), + }, + MockSlotRange { + primary_port: 6380, + replica_ports: vec![7001], + slot_range: (8001..16380), + }, + ]; + + let moved_from_port = 6379; + let moved_to_port = 6379; + + // Tracking moved for validation + let moved_requests = Arc::new(atomic::AtomicUsize::new(0)); + let cloned_moved_requests = moved_requests.clone(); + + // Test key and slot of the first shard + let key = "test"; + let key_slot = 6918; + + // Mock environment setup + let MockEnv { + runtime, + async_connection: mut connection, + handler: _handler, + .. + } = MockEnv::with_client_builder( + ClusterClient::builder(vec![&*format!("redis://{name}")]) + .slots_refresh_rate_limit(Duration::from_secs(1000000), 0), // Rate limiter to disable slot refresh + name, + move |cmd: &[u8], port| { + if contains_slice(cmd, b"PING") + || contains_slice(cmd, b"SETNAME") + || contains_slice(cmd, b"READONLY") + { + return Err(Ok(Value::SimpleString("OK".into()))); + } + + if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") { + let slots = create_topology_from_config(name, slots_config.clone()); + return Err(Ok(slots)); + } + + if contains_slice(cmd, b"SET") { + if port == moved_to_port { + if moved_requests.load(Ordering::Relaxed) == 0 { + moved_requests.fetch_add(1, Ordering::Relaxed); + Err(parse_redis_value( + format!("-MOVED {key_slot} {name}:{moved_to_port}\r\n").as_bytes(), + )) + } else { + Err(Ok(Value::SimpleString("OK".into()))) + } + } else { + panic!("unexpected port for SET command: {port:?}.\n + Expected one of: moved_to_port={moved_to_port}, moved_from_port={moved_from_port}"); + } + } else { + panic!("unexpected command {cmd:?}") + } + }, + ); + + // First request: Trigger MOVED error and reroute + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Second request: Should be still routed to the same primary node + let value = runtime.block_on( + cmd("SET") + .arg(key) + .arg("bar") + .query_async::<_, Option>(&mut connection), + ); + assert_eq!(value, Ok(Some(Value::SimpleString("OK".to_owned())))); + + // Assert there was only a single MOVED error + assert_eq!(cloned_moved_requests.load(Ordering::Relaxed), 1); + } + #[test] #[serial_test::serial] fn test_async_cluster_reconnect_even_with_zero_retries() {