Skip to content

Commit

Permalink
Changed check_topology_and_refresh_if_diff to return a result
Browse files Browse the repository at this point in the history
  • Loading branch information
barshaul committed Aug 26, 2024
1 parent 77999c6 commit eafaadb
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 14 deletions.
35 changes: 27 additions & 8 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,9 @@ where
Ok(())
}

/// Determines if the cluster topology has changed and refreshes slots and subscriptions if needed.
/// Returns `RedisResult` with `true` if changes were detected and slots were refreshed,
/// or `false` if no changes were found. Raises an error if refreshing the topology fails.
pub(crate) async fn check_topology_and_refresh_if_diff(
inner: Arc<InnerCore<C>>,
policy: &RefreshPolicy,
Expand All @@ -1423,14 +1426,30 @@ where
async fn periodic_topology_check(inner: Arc<InnerCore<C>>, interval_duration: Duration) {
loop {
let _ = boxed_sleep(interval_duration).await;
let topology_changed =
Self::check_topology_and_refresh_if_diff(inner.clone(), &RefreshPolicy::Throttable)
.await;
if !topology_changed {
// This serves as a safety measure for validating pubsub subsctiptions state in case it has drifted
// while topology stayed the same.
// For example, a failed attempt to refresh a connection which is triggered from refresh_pubsub_subscriptions(),
// might leave a node unconnected indefinitely in case topology is stable and no request are attempted to this node.

// Check and refresh topology if needed
let should_refresh_pubsub = match Self::check_topology_and_refresh_if_diff(
inner.clone(),
&RefreshPolicy::Throttable,
)
.await
{
Ok(topology_changed) => !topology_changed,
Err(err) => {
warn!(
"Failed to refresh slots during periodic topology checks:\n{:?}",
err
);
true
}
};

// Refresh pubsub subscriptions if topology wasn't changed or an error occurred.
// This serves as a safety measure for validating pubsub subsctiptions state in case it has drifted
// while topology stayed the same.
// For example, a failed attempt to refresh a connection which is triggered from refresh_pubsub_subscriptions(),
// might leave a node unconnected indefinitely in case topology is stable and no request are attempted to this node.
if should_refresh_pubsub {
Self::refresh_pubsub_subscriptions(inner.clone()).await;
}
}
Expand Down
29 changes: 23 additions & 6 deletions redis/src/commands/cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub(crate) trait ClusterInScan {
async fn are_all_slots_covered(&self) -> bool;

/// Check if the topology of the cluster has changed and refresh the slots if needed
async fn refresh_if_topology_changed(&self);
async fn refresh_if_topology_changed(&self) -> RedisResult<bool>;
}

/// Represents the state of a scan operation in a Redis cluster.
Expand Down Expand Up @@ -288,7 +288,16 @@ impl ScanState {
&mut self,
connection: &C,
) -> RedisResult<ScanState> {
let _ = connection.refresh_if_topology_changed().await;
connection
.refresh_if_topology_changed()
.await
.map_err(|err| {
RedisError::from((
ErrorKind::ResponseError,
"Error during cluster scan: failed to refresh slots",
format!("{:?}", err),
))
})?;
let mut scanned_slots_map = self.scanned_slots_map;
// If the address epoch changed it mean that some slots in the address are new, so we cant know which slots been there from the beginning and which are new, or out and in later.
// In this case we will skip updating the scanned_slots_map and will just update the address and the cursor
Expand Down Expand Up @@ -387,14 +396,14 @@ where
async fn are_all_slots_covered(&self) -> bool {
ClusterConnInner::<C>::check_if_all_slots_covered(&self.conn_lock.read().await.slot_map)
}
async fn refresh_if_topology_changed(&self) {
async fn refresh_if_topology_changed(&self) -> RedisResult<bool> {
ClusterConnInner::check_topology_and_refresh_if_diff(
self.to_owned(),
// The cluster SCAN implementation must refresh the slots when a topology change is found
// to ensure the scan logic is correct.
&RefreshPolicy::NotThrottable,
)
.await;
.await
}
}

Expand Down Expand Up @@ -529,7 +538,13 @@ where
{
// TODO: This mechanism of refreshing on failure to route to address should be part of the routing mechanism
// After the routing mechanism is updated to handle this case, this refresh in the case bellow should be removed
core.refresh_if_topology_changed().await;
core.refresh_if_topology_changed().await.map_err(|err| {
RedisError::from((
ErrorKind::ResponseError,
"Error during cluster scan: failed to refresh slots",
format!("{:?}", err),
))
})?;
if !core.are_all_slots_covered().await {
return Err(RedisError::from((
ErrorKind::NotAllSlotsCovered,
Expand Down Expand Up @@ -615,7 +630,9 @@ mod tests {
struct MockConnection;
#[async_trait]
impl ClusterInScan for MockConnection {
async fn refresh_if_topology_changed(&self) {}
async fn refresh_if_topology_changed(&self) -> RedisResult<bool> {
Ok(true)
}
async fn get_address_by_slot(&self, _slot: u16) -> RedisResult<String> {
Ok("mock_address".to_string())
}
Expand Down

0 comments on commit eafaadb

Please sign in to comment.