import socket import redis from redis.connection import SSLConnection from redis.sentinel import Sentinel from configs import dify_config class RedisClientWrapper(redis.Redis): """ A wrapper class for the Redis client that addresses the issue where the global `redis_client` variable cannot be updated when a new Redis instance is returned by Sentinel. This class allows for deferred initialization of the Redis client, enabling the client to be re-initialized with a new instance when necessary. This is particularly useful in scenarios where the Redis instance may change dynamically, such as during a failover in a Sentinel-managed Redis setup. Attributes: _client (redis.Redis): The actual Redis client instance. It remains None until initialized with the `initialize` method. Methods: initialize(client): Initializes the Redis client if it hasn't been initialized already. __getattr__(item): Delegates attribute access to the Redis client, raising an error if the client is not initialized. """ def __init__(self): self._client = None def initialize(self, client): if self._client is None: self._client = client def __getattr__(self, item): if self._client is None: raise RuntimeError("Redis client is not initialized. Call init_app first.") return getattr(self._client, item) redis_client = RedisClientWrapper() class GeventSafeConnection(redis.Connection): socket_socket_class: type[socket.socket] | None = None def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def _connect(self): "Create a TCP socket connection" # we want to mimic what socket.create_connection does to support # ipv4/ipv6, but we want to set options prior to calling # socket.connect() err = None for res in socket.getaddrinfo(self.host, self.port, self.socket_type, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = None try: socket_socket_class = self.socket_socket_class or socket.socket sock = socket_socket_class(family, socktype, proto) # TCP_NODELAY sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # TCP_KEEPALIVE if self.socket_keepalive: sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) for k, v in self.socket_keepalive_options.items(): sock.setsockopt(socket.IPPROTO_TCP, k, v) # set the socket_connect_timeout before we connect sock.settimeout(self.socket_connect_timeout) # connect sock.connect(socket_address) # set the socket_timeout now that we're connected sock.settimeout(self.socket_timeout) return sock except OSError as _: err = _ if sock is not None: sock.close() if err is not None: raise err raise OSError("socket.getaddrinfo returned an empty list") def init_app(app): global redis_client connection_class = GeventSafeConnection if dify_config.REDIS_USE_SSL: connection_class = SSLConnection redis_params = { "username": dify_config.REDIS_USERNAME, "password": dify_config.REDIS_PASSWORD, "db": dify_config.REDIS_DB, "encoding": "utf-8", "encoding_errors": "strict", "decode_responses": False, } if dify_config.REDIS_USE_SENTINEL: sentinel_hosts = [ (node.split(":")[0], int(node.split(":")[1])) for node in dify_config.REDIS_SENTINELS.split(",") ] sentinel = Sentinel( sentinel_hosts, sentinel_kwargs={ "socket_timeout": dify_config.REDIS_SENTINEL_SOCKET_TIMEOUT, "username": dify_config.REDIS_SENTINEL_USERNAME, "password": dify_config.REDIS_SENTINEL_PASSWORD, }, ) master = sentinel.master_for(dify_config.REDIS_SENTINEL_SERVICE_NAME, **redis_params) redis_client.initialize(master) else: redis_params.update( { "host": dify_config.REDIS_HOST, "port": dify_config.REDIS_PORT, "connection_class": connection_class, } ) pool = redis.ConnectionPool(**redis_params) redis_client.initialize(redis.Redis(connection_pool=pool)) app.extensions["redis"] = redis_client