dify/api/extensions/ext_redis.py
2024-10-24 00:54:10 -07:00

137 lines
4.7 KiB
Python

import socket
import redis
from redis.connection import SSLConnection
from redis.sentinel import Sentinel
from configs import dify_config
class RedisClientWrapper(redis.Redis):
"""
A wrapper class for the Redis client that addresses the issue where the global
`redis_client` variable cannot be updated when a new Redis instance is returned
by Sentinel.
This class allows for deferred initialization of the Redis client, enabling the
client to be re-initialized with a new instance when necessary. This is particularly
useful in scenarios where the Redis instance may change dynamically, such as during
a failover in a Sentinel-managed Redis setup.
Attributes:
_client (redis.Redis): The actual Redis client instance. It remains None until
initialized with the `initialize` method.
Methods:
initialize(client): Initializes the Redis client if it hasn't been initialized already.
__getattr__(item): Delegates attribute access to the Redis client, raising an error
if the client is not initialized.
"""
def __init__(self):
self._client = None
def initialize(self, client):
if self._client is None:
self._client = client
def __getattr__(self, item):
if self._client is None:
raise RuntimeError("Redis client is not initialized. Call init_app first.")
print(self._client.connection_pool.get_connection("ping").socket_socket_class)
return getattr(self._client, item)
redis_client = RedisClientWrapper()
class GeventSafeConnection(redis.Connection):
socket_socket_class: type[socket.socket] | None = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _connect(self):
"Create a TCP socket connection"
# we want to mimic what socket.create_connection does to support
# ipv4/ipv6, but we want to set options prior to calling
# socket.connect()
err = None
for res in socket.getaddrinfo(self.host, self.port, self.socket_type, socket.SOCK_STREAM):
family, socktype, proto, canonname, socket_address = res
sock = None
try:
socket_socket_class = self.socket_socket_class or socket.socket
sock = socket_socket_class(family, socktype, proto)
# TCP_NODELAY
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# TCP_KEEPALIVE
if self.socket_keepalive:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
for k, v in self.socket_keepalive_options.items():
sock.setsockopt(socket.IPPROTO_TCP, k, v)
# set the socket_connect_timeout before we connect
sock.settimeout(self.socket_connect_timeout)
# connect
sock.connect(socket_address)
# set the socket_timeout now that we're connected
sock.settimeout(self.socket_timeout)
return sock
except OSError as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
raise OSError("socket.getaddrinfo returned an empty list")
def init_app(app):
global redis_client
connection_class = GeventSafeConnection
if dify_config.REDIS_USE_SSL:
connection_class = SSLConnection
redis_params = {
"username": dify_config.REDIS_USERNAME,
"password": dify_config.REDIS_PASSWORD,
"db": dify_config.REDIS_DB,
"encoding": "utf-8",
"encoding_errors": "strict",
"decode_responses": False,
}
if dify_config.REDIS_USE_SENTINEL:
sentinel_hosts = [
(node.split(":")[0], int(node.split(":")[1])) for node in dify_config.REDIS_SENTINELS.split(",")
]
sentinel = Sentinel(
sentinel_hosts,
sentinel_kwargs={
"socket_timeout": dify_config.REDIS_SENTINEL_SOCKET_TIMEOUT,
"username": dify_config.REDIS_SENTINEL_USERNAME,
"password": dify_config.REDIS_SENTINEL_PASSWORD,
},
)
master = sentinel.master_for(dify_config.REDIS_SENTINEL_SERVICE_NAME, **redis_params)
redis_client.initialize(master)
else:
redis_params.update(
{
"host": dify_config.REDIS_HOST,
"port": dify_config.REDIS_PORT,
"connection_class": connection_class,
}
)
pool = redis.ConnectionPool(**redis_params)
redis_client.initialize(redis.Redis(connection_pool=pool))
app.extensions["redis"] = redis_client