dify/api/extensions/storage/opendal_storage.py
Yeuoly 899df30bf6
Plugin/merge main to plugin/beta 20250122 (#12962)
Signed-off-by: yihong0618 <zouzou0208@gmail.com>
Signed-off-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: kurokobo <kuro664@gmail.com>
Co-authored-by: Hiroshi Fujita <fujita-h@users.noreply.github.com>
Co-authored-by: NFish <douxc512@gmail.com>
Co-authored-by: Gen Sato <52241300+halogen22@users.noreply.github.com>
Co-authored-by: eux <euxuuu@gmail.com>
Co-authored-by: huangzhuo1949 <167434202+huangzhuo1949@users.noreply.github.com>
Co-authored-by: huangzhuo <huangzhuo1@xiaomi.com>
Co-authored-by: lotsik <lotsik@mail.ru>
Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com>
Co-authored-by: Wu Tianwei <30284043+WTW0313@users.noreply.github.com>
Co-authored-by: nite-knite <nkCoding@gmail.com>
Co-authored-by: Jyong <76649700+JohnJyong@users.noreply.github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: gakkiyomi <gakkiyomi@aliyun.com>
Co-authored-by: CN-P5 <heibai2006@gmail.com>
Co-authored-by: CN-P5 <heibai2006@qq.com>
Co-authored-by: Chuehnone <1897025+chuehnone@users.noreply.github.com>
Co-authored-by: yihong <zouzou0208@gmail.com>
Co-authored-by: Kevin9703 <51311316+Kevin9703@users.noreply.github.com>
Co-authored-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: Boris Feld <lothiraldan@gmail.com>
Co-authored-by: mbo <himabo@gmail.com>
Co-authored-by: mabo <mabo@aeyes.ai>
Co-authored-by: Warren Chen <warren.chen830@gmail.com>
Co-authored-by: KVOJJJin <jzongcode@gmail.com>
Co-authored-by: JzoNgKVO <27049666+JzoNgKVO@users.noreply.github.com>
Co-authored-by: jiandanfeng <chenjh3@wangsu.com>
Co-authored-by: zhu-an <70234959+xhdd123321@users.noreply.github.com>
Co-authored-by: zhaoqingyu.1075 <zhaoqingyu.1075@bytedance.com>
Co-authored-by: 海狸大師 <86974027+yenslife@users.noreply.github.com>
Co-authored-by: Xu Song <xusong.vip@gmail.com>
Co-authored-by: rayshaw001 <396301947@163.com>
Co-authored-by: Ding Jiatong <dingjiatong@gmail.com>
Co-authored-by: Bowen Liang <liangbowen@gf.com.cn>
Co-authored-by: JasonVV <jasonwangiii@outlook.com>
Co-authored-by: le0zh <newlight@qq.com>
Co-authored-by: zhuxinliang <zhuxinliang@didiglobal.com>
Co-authored-by: k-zaku <zaku99@outlook.jp>
Co-authored-by: Joel <iamjoel007@gmail.com>
Co-authored-by: luckylhb90 <luckylhb90@gmail.com>
Co-authored-by: hobo.l <hobo.l@binance.com>
Co-authored-by: jiangbo721 <365065261@qq.com>
Co-authored-by: 刘江波 <jiangbo721@163.com>
Co-authored-by: Shun Miyazawa <34241526+miya@users.noreply.github.com>
Co-authored-by: EricPan <30651140+Egfly@users.noreply.github.com>
Co-authored-by: crazywoola <427733928@qq.com>
Co-authored-by: zxhlyh <jasonapring2015@outlook.com>
Co-authored-by: sino <sino2322@gmail.com>
Co-authored-by: Jhvcc <37662342+Jhvcc@users.noreply.github.com>
Co-authored-by: lowell <lowell.hu@zkteco.in>
2025-01-23 14:48:16 +08:00

90 lines
3.3 KiB
Python

import logging
import os
from collections.abc import Generator
from pathlib import Path
import opendal # type: ignore[import]
from dotenv import dotenv_values
from extensions.storage.base_storage import BaseStorage
logger = logging.getLogger(__name__)
def _get_opendal_kwargs(*, scheme: str, env_file_path: str = ".env", prefix: str = "OPENDAL_"):
kwargs = {}
config_prefix = prefix + scheme.upper() + "_"
for key, value in os.environ.items():
if key.startswith(config_prefix):
kwargs[key[len(config_prefix) :].lower()] = value
file_env_vars: dict = dotenv_values(env_file_path) or {}
for key, value in file_env_vars.items():
if key.startswith(config_prefix) and key[len(config_prefix) :].lower() not in kwargs and value:
kwargs[key[len(config_prefix) :].lower()] = value
return kwargs
class OpenDALStorage(BaseStorage):
def __init__(self, scheme: str, **kwargs):
kwargs = kwargs or _get_opendal_kwargs(scheme=scheme)
if scheme == "fs":
root = kwargs.get("root", "storage")
Path(root).mkdir(parents=True, exist_ok=True)
self.op = opendal.Operator(scheme=scheme, **kwargs) # type: ignore
logger.debug(f"opendal operator created with scheme {scheme}")
retry_layer = opendal.layers.RetryLayer(max_times=3, factor=2.0, jitter=True)
self.op = self.op.layer(retry_layer)
logger.debug("added retry layer to opendal operator")
def save(self, filename: str, data: bytes) -> None:
self.op.write(path=filename, bs=data)
logger.debug(f"file {filename} saved")
def load_once(self, filename: str) -> bytes:
if not self.exists(filename):
raise FileNotFoundError("File not found")
content: bytes = self.op.read(path=filename)
logger.debug(f"file {filename} loaded")
return content
def load_stream(self, filename: str) -> Generator:
if not self.exists(filename):
raise FileNotFoundError("File not found")
batch_size = 4096
file = self.op.open(path=filename, mode="rb")
while chunk := file.read(batch_size):
yield chunk
logger.debug(f"file {filename} loaded as stream")
def download(self, filename: str, target_filepath: str):
if not self.exists(filename):
raise FileNotFoundError("File not found")
with Path(target_filepath).open("wb") as f:
f.write(self.op.read(path=filename))
logger.debug(f"file {filename} downloaded to {target_filepath}")
def exists(self, filename: str) -> bool:
# FIXME this is a workaround for opendal python-binding do not have a exists method and no better
# error handler here when opendal python-binding has a exists method, we should use it
# more https://github.com/apache/opendal/blob/main/bindings/python/src/operator.rs
try:
res: bool = self.op.stat(path=filename).mode.is_file()
logger.debug(f"file {filename} checked")
return res
except Exception:
return False
def delete(self, filename: str):
if self.exists(filename):
self.op.delete(path=filename)
logger.debug(f"file {filename} deleted")
return
logger.debug(f"file {filename} not found, skip delete")