Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions reflex/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def get_url(self) -> str:


# These vars are not logged because they may contain sensitive information.
_sensitive_env_vars = {"DB_URL", "ASYNC_DB_URL", "REDIS_URL"}
_sensitive_env_vars = {"DB_URL", "ASYNC_DB_URL", "REDIS_URL", "REDIS_SENTINEL_PASSWORD", "REDIS_SENTINEL_MASTER_PASSWORD"}


@dataclasses.dataclass(kw_only=True)
Expand Down Expand Up @@ -187,6 +187,24 @@ class BaseConfig:
# The redis url
redis_url: str | None = None

# Comma-separated list of Redis Sentinel host:port pairs (e.g. "host1:26379,host2:26379").
redis_sentinel_nodes: str | None = None

# The Redis Sentinel service (master) name.
redis_sentinel_service: str | None = None

# Password for authenticating with Redis Sentinel instances (optional).
redis_sentinel_password: str | None = None

# Password for authenticating with the Redis master/replicas discovered via Sentinel (optional).
redis_sentinel_master_password: str | None = None

# Redis database number to use when connecting via Sentinel.
redis_sentinel_db: int = 0

# Whether to use SSL/TLS when connecting to Redis Sentinel.
redis_sentinel_ssl: bool = False

# Telemetry opt-in.
telemetry_enabled: bool = True

Expand Down Expand Up @@ -362,12 +380,19 @@ def _post_init(self, **kwargs):
self._non_default_attributes = set(kwargs.keys())
self._replace_defaults(**kwargs)

if (
self.state_manager_mode == constants.StateManagerMode.REDIS
and not self.redis_url
):
msg = f"{self._prefixes[0]}REDIS_URL is required when using the redis state manager."
raise ConfigError(msg)
if self.state_manager_mode == constants.StateManagerMode.REDIS:
has_redis_url = bool(self.redis_url)
has_sentinel = bool(
self.redis_sentinel_nodes and self.redis_sentinel_service
)
if not has_redis_url and not has_sentinel:
msg = (
f"{self._prefixes[0]}REDIS_URL or "
f"{self._prefixes[0]}REDIS_SENTINEL_NODES and "
f"{self._prefixes[0]}REDIS_SENTINEL_SERVICE "
"are required when using the redis state manager."
)
raise ConfigError(msg)

def _add_builtin_plugins(self):
"""Add the builtin plugins to the config."""
Expand Down
5 changes: 4 additions & 1 deletion reflex/istate/manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ def create(cls, state: type[BaseState]):
The state manager (either disk, memory or redis).
"""
config = get_config()
if prerequisites.parse_redis_url() is not None:
if (
prerequisites.parse_redis_url() is not None
or prerequisites.get_sentinel_config() is not None
):
config.state_manager_mode = constants.StateManagerMode.REDIS
if config.state_manager_mode == constants.StateManagerMode.MEMORY:
from reflex.istate.manager.memory import StateManagerMemory
Expand Down
106 changes: 106 additions & 0 deletions reflex/utils/prerequisites.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,70 @@ def compile_or_validate_app(
return True


def _parse_sentinel_nodes(nodes_str: str) -> list[tuple[str, int]]:
"""Parse a comma-separated list of sentinel host:port pairs.

Args:
nodes_str: Comma-separated sentinel addresses (e.g. "host1:26379,host2:26379").

Returns:
List of (host, port) tuples.

Raises:
ValueError: If the format is invalid.
"""
sentinels = []
for node in nodes_str.split(","):
node = node.strip()
if not node:
continue
parts = node.rsplit(":", 1)
if len(parts) != 2:
msg = f"Invalid sentinel node format: {node!r}. Expected 'host:port'."
raise ValueError(msg)
host, port_str = parts
if not host:
msg = f"Invalid sentinel node format: {node!r}. Host cannot be empty."
raise ValueError(msg)
try:
port = int(port_str)
except ValueError:
msg = f"Invalid sentinel port: {port_str!r} in node {node!r}."
raise ValueError(msg) from None
sentinels.append((host, port))
if not sentinels:
msg = f"No valid sentinel nodes found in: {nodes_str!r}."
raise ValueError(msg)
return sentinels


def get_sentinel_config() -> tuple[list[tuple[str, int]], str, str | None, str | None, int, bool] | None:
"""Get Redis Sentinel configuration from the app config if set.

Returns:
A tuple of (sentinel_nodes, service_name, sentinel_password, master_password, db, ssl)
or None if not configured.
"""
config = get_config()
if not config.redis_sentinel_nodes or not config.redis_sentinel_service:
return None
nodes = _parse_sentinel_nodes(config.redis_sentinel_nodes)
return (
nodes,
config.redis_sentinel_service,
config.redis_sentinel_password,
config.redis_sentinel_master_password,
config.redis_sentinel_db,
config.redis_sentinel_ssl,
)


def get_redis() -> Redis | None:
"""Get the asynchronous redis client.

If Redis Sentinel is configured, returns a client connected to the sentinel master.
Otherwise falls back to direct redis_url connection.

Returns:
The asynchronous redis client.
"""
Expand All @@ -377,6 +438,27 @@ def get_redis() -> Redis | None:
except ImportError:
console.debug("Redis package not installed.")
return None

sentinel_config = get_sentinel_config()
if sentinel_config is not None:
from redis.asyncio.sentinel import Sentinel

nodes, service_name, sentinel_password, master_password, db, ssl = sentinel_config
sentinel_kwargs = {}
if sentinel_password:
sentinel_kwargs["password"] = sentinel_password
sentinel = Sentinel(
nodes,
sentinel_kwargs=sentinel_kwargs,
retry_on_error=[RedisError],
)
return sentinel.master_for(
service_name,
db=db,
password=master_password,
ssl=ssl,
)

if (redis_url := parse_redis_url()) is not None:
return Redis.from_url(
redis_url,
Expand All @@ -388,6 +470,9 @@ def get_redis() -> Redis | None:
def get_redis_sync() -> RedisSync | None:
"""Get the synchronous redis client.

If Redis Sentinel is configured, returns a client connected to the sentinel master.
Otherwise falls back to direct redis_url connection.

Returns:
The synchronous redis client.
"""
Expand All @@ -397,6 +482,27 @@ def get_redis_sync() -> RedisSync | None:
except ImportError:
console.debug("Redis package not installed.")
return None

sentinel_config = get_sentinel_config()
if sentinel_config is not None:
from redis.sentinel import Sentinel

nodes, service_name, sentinel_password, master_password, db, ssl = sentinel_config
sentinel_kwargs = {}
if sentinel_password:
sentinel_kwargs["password"] = sentinel_password
sentinel = Sentinel(
nodes,
sentinel_kwargs=sentinel_kwargs,
retry_on_error=[RedisError],
)
return sentinel.master_for(
service_name,
db=db,
password=master_password,
ssl=ssl,
)

if (redis_url := parse_redis_url()) is not None:
return RedisSync.from_url(
redis_url,
Expand Down