-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redis sentinel support #256
Comments
Thanks for the suggestion. Do you know what client-side changes are required to support Redis Sentinel? |
I think it's a different constructor, but otherwise the same. But I also think this might not be necessary, as certainly traefik and I think also the Redis client should handle disruptions to the redis server with retries. and won't actually have availability affected. Redis being down doesn't disrupt the proxy, it should only disrupt changes to the proxy. |
It's a different constructor. I'm bringing this because I had some issue of getting Logs:
This log like 20 times:
|
Got it, thanks. Do you happen to know how long it took your redis server to come back, and if the client was able to recover eventually without being restarted? |
So to reproduce this is as easy as deleting redis with
My redis instance has the following liveness and readiness probes
Regardless of how much I wait, the client never recovers and it requires me to restart hub |
Can you set |
I think Sentinel support makes plenty of sense, we just need to figure out what the config options should look like. |
Unfortunately, using I tried looking for more logs and I found these after deleting redis pod. Is it mandatory to have this root key? my redis node doesn't have persistence memory, so after a restart everything is lost
|
I don't think it will work without persistence. It is at least specified as required, along with keyspace notifications. Redis is not a stateless communication medium, it is the state of the proxy, so it should be persisted. Persistence is the main reason to use a KV store backend for the proxy. We might be able to make it work (I don't think it does now, see #247, #242), but clearing the config stored in redis will definitely always be a massive disruption (more so than redis not running at all), so requiring persistence is the best way to ensure sensible behavior. |
Persistence would be best, but in cases where the storage options for it aren't good, could a sidecar container service be used to prompt the hub to sync the proxy by doing a |
That should be addressable with #247. Right now, it restores the routes but it doesn't restore the initial dynamic config, which is also required. |
OK. The sidecar hub prompt is a work-around we've used "successfully" with etcd, but hadn't tried it with Redis so hadn't run into #247 in that context. Hoping we don't have to resort to using it again... |
If you have persistence, it won't be necessary. Persistence greatly improves reliability. But as part of #247 we could also look at using a watch to notice disruptions rather than needing to wait for a prod. |
So I have worked on implementing basic sentinel support but getting the same error as before, spawn failed + same consistent log Do you know what could be the issue? All key-values are stored in redis except when creating a new server. Here is my code, config, and redis keys: import asyncio
from urllib.parse import urlparse
from jupyterhub_traefik_proxy.redis import TraefikRedisProxy
from jupyterhub_traefik_proxy.traefik_utils import deep_merge
from redis.exceptions import ConnectionError
from traitlets import Any, Int, List, Tuple, Unicode
try:
from redis.asyncio.sentinel import Sentinel
except ImportError:
raise ImportError(
"Please install `redis` package to use traefik-proxy with redis sentinel"
)
class TraefikRedisSentinelProxy(TraefikRedisProxy):
provider_name = "redis_sentinel"
sentinel_hosts = List(
trait=Tuple,
config=True,
help="Sentinel hosts in the [(address, port), (address, port), ...] format",
)
master_name = Unicode(config=True, help="The name of the Sentinel master")
max_reconnection_retries = Int(
default_value=3,
config=True,
help="Maximum number of retries for reconnection attempts",
)
retry_interval = Int(
default_value=1,
config=True,
help="Interval (in seconds) before retrying to get the master connection",
)
sentinel = Any()
redis = Any()
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.log.info(
f"sentinel hosts: {[(urlparse(url).netloc, port) for url, port in self.sentinel_hosts]}"
)
self.sentinel = Sentinel(
[(urlparse(url).netloc, port) for url, port in self.sentinel_hosts],
decode_responses=True,
)
self.redis = self.get_master_connection()
def get_master_connection(self):
self.log.info("Obtaining master server from sentinel")
return self.sentinel.master_for(self.master_name)
def get_slave_connection(self):
self.log.info("Obtaining slave server from sentinel")
return self.sentinel.slave_for(self.master_name)
def _setup_traefik_static_config(self):
self.log.info(
"Setting up the redis provider and sentinel in the traefik static config"
)
redis_config = {
"endpoints": [
f"{urlparse(url).netloc}:{port}" for url, port in self.sentinel_hosts
],
"rootKey": self.kv_traefik_prefix,
"sentinel": {"masterName": self.master_name},
}
self.static_config = deep_merge(
self.static_config, {"providers": {"redis": redis_config}}
)
self.log.info(f"Final static config: {self.static_config}")
return super(TraefikRedisProxy, self)._setup_traefik_static_config()
async def execute_with_reconnection(self, func, *args, **kwargs):
"""
Execute a function with automatic reconnection handling.
:param func: The Redis command to execute.
:param args: Arguments for the Redis command.
:param kwargs: Keyword arguments for the Redis command.
"""
current_retries = 0
for attempt in range(self.max_reconnection_retries):
try:
return await func(*args, **kwargs)
except ConnectionError as e:
self.log.error(
f"Error during Redis operation: {e}. Attempt {attempt + 1} of {self.max_reconnection_retries}. Reconnecting to master..."
)
self.redis = self.get_master_connection()
await asyncio.sleep(self.retry_interval)
current_retries += 1
raise ConnectionError(
f"Failed to reconnect after {self.max_reconnection_retries} attempts"
)
async def _kv_get_tree(self, prefix):
async def wrapped():
return await super(TraefikRedisSentinelProxy, self)._kv_get_tree(prefix)
return await self.execute_with_reconnection(wrapped)
async def _kv_atomic_set(self, to_set: dict):
async def wrapped():
return await super(TraefikRedisSentinelProxy, self)._kv_atomic_set(to_set)
return await self.execute_with_reconnection(wrapped)
async def _kv_atomic_delete(self, *keys):
async def wrapped():
return await super(TraefikRedisSentinelProxy, self)._kv_atomic_delete(*keys)
return await self.execute_with_reconnection(wrapped) Same redis configuration except redis_url and these two extra variables:
Keys stored in redis
|
Okay, found the issue. I still need to use provider_name Fixed code: import asyncio
from urllib.parse import urlparse
from jupyterhub_traefik_proxy.redis import TraefikRedisProxy
from jupyterhub_traefik_proxy.traefik_utils import deep_merge
from redis.exceptions import ConnectionError
from traitlets import Any, Int, List, Tuple, Unicode
try:
from redis.asyncio.sentinel import Sentinel
except ImportError:
raise ImportError(
"Please install `redis` package to use traefik-proxy with redis sentinel"
)
class TraefikRedisSentinelProxy(TraefikRedisProxy):
provider_name = "redis"
sentinel_hosts = List(
trait=Tuple,
config=True,
help="Sentinel hosts in the [(address, port), (address, port), ...] format",
)
master_name = Unicode(config=True, help="The name of the Sentinel master")
max_reconnection_retries = Int(
default_value=3,
config=True,
help="Maximum number of retries for reconnection attempts",
)
retry_interval = Int(
default_value=1,
config=True,
help="Interval (in seconds) before retrying to get the master connection",
)
sentinel = Any()
redis = Any()
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.log.info(
f"sentinel hosts: {[(urlparse(url).netloc, port) for url, port in self.sentinel_hosts]}"
)
self.sentinel = Sentinel(
[(urlparse(url).netloc, port) for url, port in self.sentinel_hosts],
decode_responses=True,
)
self.redis = self.get_master_connection()
def get_master_connection(self):
self.log.info("Obtaining master server from sentinel")
return self.sentinel.master_for(self.master_name)
def get_slave_connection(self):
self.log.info("Obtaining slave server from sentinel")
return self.sentinel.slave_for(self.master_name)
def _setup_traefik_static_config(self):
self.log.info(
"Setting up the redis provider and sentinel in the traefik static config"
)
redis_config = {
"endpoints": [
f"{urlparse(url).netloc}:{port}" for url, port in self.sentinel_hosts
],
"rootKey": self.kv_traefik_prefix,
"sentinel": {"masterName": self.master_name},
}
self.static_config = deep_merge(
self.static_config, {"providers": {"redis": redis_config}}
)
self.log.info(f"Final static config: {self.static_config}")
return super(TraefikRedisProxy, self)._setup_traefik_static_config()
async def execute_with_reconnection(self, func, *args, **kwargs):
"""
Execute a function with automatic reconnection handling.
:param func: The Redis command to execute.
:param args: Arguments for the Redis command.
:param kwargs: Keyword arguments for the Redis command.
"""
current_retries = 0
for attempt in range(self.max_reconnection_retries):
try:
return await func(*args, **kwargs)
except ConnectionError as e:
self.log.error(
f"Error during Redis operation: {e}. Attempt {attempt + 1} of {self.max_reconnection_retries}. Reconnecting to master..."
)
self.redis = self.get_master_connection()
await asyncio.sleep(self.retry_interval)
current_retries += 1
raise ConnectionError(
f"Failed to reconnect after {self.max_reconnection_retries} attempts"
)
async def _kv_get_tree(self, prefix):
async def wrapped():
return await super(TraefikRedisSentinelProxy, self)._kv_get_tree(prefix)
return await self.execute_with_reconnection(wrapped)
async def _kv_atomic_set(self, to_set: dict):
async def wrapped():
return await super(TraefikRedisSentinelProxy, self)._kv_atomic_set(to_set)
return await self.execute_with_reconnection(wrapped)
async def _kv_atomic_delete(self, *keys):
async def wrapped():
return await super(TraefikRedisSentinelProxy, self)._kv_atomic_delete(*keys)
return await self.execute_with_reconnection(wrapped) |
Thanks! I don't think we need a new class, I think we just need to add the Sentinel claims it will handle reconnect and should failover automatically with our existing Retry configuration. I'm having some difficulty setting up a local sentinel deployment, but I'll test when I get the chance. Knowing that your code works is a big help! |
Proposed change
Hey, thanks for the people working on this repo, amazing work
My understanding is that we want to use traefik + redis to offer high availability on the proxy. Right now Redis is a single point of failure since the current implementation only allows to connect directly to the instance hosting it, so when redis is down all the components that rely directly or indirectly on it are going to stop working
I would like to consider support for Redis Sentinel for real HA
Thanks
Alternative options
Who would use this feature?
(Optional): Suggest a solution
The text was updated successfully, but these errors were encountered: