centos安装
手动安装高版本 containerd.io
下载最新版 containerd.io RPM 包
wget https://mirrors.aliyun.com/docker-ce/linux/centos/7/x86_64/stable/Packages/containerd.io-1.6.33-3.1.el7.x86_64.rpm
强制安装依赖包
sudo yum install -y ./containerd.io-*.rpm --skip-broken
配置国内 Docker 仓库
移除官方仓库(避免冲突)
sudo rm -f /etc/yum.repos.d/docker-ce*.repo
添加阿里云镜像源
sudo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
更新仓库缓存
sudo yum clean all && sudo yum makecache
安装 Docker CE
指定版本安装(规避依赖冲突)
# 查看可用版本
sudo yum list docker-ce --showduplicates | sort -r
# 安装兼容版本(示例)
sudo yum install -y docker-ce-26.1.4 docker-ce-cli-26.1.4 containerd.io
配置镜像加速
vim /etc/docker/daemon.json
{
"registry-mirrors": [
"https://docker.1ms.run",
"https://xxxxx.mirror.aliyuncs.com"
]
}
重启docker
systemctl restart docker
安装dify
下载源码
git clone https://github.com/langgenius/dify.git
cd dify
cd docker
cp .env.example .env
docker compose up -d
修改 .env 配置
NGINX_PORT=8080
NGINX_SSL_PORT=30443
EXPOSE_NGINX_PORT=8080
EXPOSE_NGINX_SSL_PORT=40443
docker compose down && docker compose up -d
如果要二开,需要 本地源码部署
安装ollma
curl -fsSL https://ollama.com/install.sh | sh
如果网速慢,本地下载好以后上传到服务器
https://github.com/ollama/ollama/releases/tag/v0.6.3
下载 ollama-linux-amd64.tgz
上传到服务器并解压, tar -xzf ollama-linux-amd64.tgz
启动服务,ollama serve
dify重置密码
运行: docker exec -it docker-api-1 flask reset-password
输入邮箱和密码
连接数据库
docker exec -it docker-db-1 psql -U postgres -d dify
UPDATE accounts SET failed_login_attempts = 0 WHERE email = 'your_email@example.com';
docker exec -it docker-redis-1 redis-cli
KEYS login_attempts
删除掉
更新dify
cd dify/docker
docker compose down
git pull origin main
docker compose pull
docker compose up -d
使用pyenv管理多版本
安装依赖包
yum install -y zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel libpcap-devel xz-devel
sudo yum groupinstall "Development Tools" -y
sudo yum install -y \ zlib-devel bzip2-devel openssl-devel ncurses-devel \ sqlite-devel readline-devel tk-devel libffi-devel \ xz-devel gdbm-devel db4-devel libpcap-devel
通过脚本自动安装并配置环境变量
vim ~/.bashrc 或者 vim ~/.bash_profile
export PYENV_ROOT="$HOME/.pyenv"
[[ -d $PYENV_ROOT/bin ]] && export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init - bash)"
eval "$(pyenv virtualenv-init -)"
source ~/.bashrc 或者 source ~/.bash_profile
pyenv install 3.11.0
全局切换版本,举例: pyenv global 3.11.0
局部切换
cd /path/to/project
pyenv local 3.11.0
两个项目,如果python版本相同,那么pip安装包是不隔离的,可以使用以下方式隔离
安装 pyenv-virtualenv 插件
git clone https://github.com/pyenv/pyenv-virtualenv.git $(pyenv root)/plugins/pyenv-virtualenv
windows中这个命令可能无效
pyenv which python 找到$(pyenv root),然后直接填入绝对路径
C:\Users\admin.pyenv\pyenv-win\versions\3.12.9\python.exe
git clone https://github.com/pyenv/pyenv-virtualenv.git C:\Users\admin.pyenv\pyenv-win\plugins\pyenv-virtualenv
# 在目录 A 中
pyenv virtualenv 3.11.0 projectA-env # 创建虚拟环境
pyenv local projectA-env # 绑定到目录 A
# 在目录 B 中
pyenv virtualenv 3.11.0 projectB-env # 创建另一个虚拟环境
pyenv local projectB-env # 绑定到目录 B
windows中如果 pyenv virtualenv 3.12.0 test_env 无法找到 virtualenv
使用 python 自带的 virtualenv
安装
pip install virtualenv
创建env环境
python -m virtualenv myenv
激活
.\myenv\Scripts\activate
dify文档
dify插件开发
python环境需 >= 3.12 pyenv install 3.12.0
pycharm引入python路径 ~/.pyenv/versions/3.12.x/bin/python3.12
下载客户端 https://github.com/langgenius/dify-plugin-daemon/releases
chmod +x dify-plugin-darwin-amd64
./dify-plugin-darwin-amd64 version
./dify-plugin-darwin-amd64 plugin init
根据提示填写相关信息, 使用 TAB建 可以进行勾选
Storage Size 根据实际情况调节, 比如: 104857600 100M
如果是上传文件的场景,扩大到100M了,那么 .env 配置同步扩大下
# 允许上传 50MB 文件
NGINX_CLIENT_MAX_BODY_SIZE=50M
# 插件包体积限制
PLUGIN_MAX_PACKAGE_SIZE=104857600
打开 /provider 下的 xxx.yaml 也可以自行重命名
pip install dify_plugin
编写相关逻辑
发布插件
./dify-plugin-darwin-amd64 plugin package ./your_plugin_dir
Xorbits Inference
pip install 安装报错的话,使用 docker 安装,latest可以替换为最新的稳定版本
docker pull registry.cn-hangzhou.aliyuncs.com/xprobe_xinference/xinference:latest
openwebui连接deepseek
"""
title: DeepSeek R1
author: zgccrui
description: 在OpwenWebUI中显示DeepSeek R1模型的思维链 - 仅支持0.5.6及以上版本
version: 1.2.16
licence: MIT
"""
import json
import httpx
import re
from typing import AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field
import asyncio
import traceback
class Pipe:
class Valves(BaseModel):
DEEPSEEK_API_BASE_URL: str = Field(
default="https://api.deepseek.com/v1",
description="DeepSeek API的基础请求地址",
)
DEEPSEEK_API_KEY: str = Field(
default="", description="用于身份验证的DeepSeek API密钥,可从控制台获取"
)
DEEPSEEK_API_MODEL: str = Field(
default="deepseek-reasoner",
description="API请求的模型名称,默认为 deepseek-reasoner,多模型名可使用`,`分隔",
)
def __init__(self):
self.valves = self.Valves()
self.data_prefix = "data:"
self.emitter = None
def pipes(self):
models = self.valves.DEEPSEEK_API_MODEL.split(",")
return [
{
"id": model.strip(),
"name": model.strip(),
}
for model in models
]
async def pipe(
self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> AsyncGenerator[str, None]:
"""主处理管道(已移除缓冲)"""
thinking_state = {"thinking": -1} # 用于存储thinking状态
self.emitter = __event_emitter__
# 用于存储联网模式下返回的参考资料列表
stored_references = []
# 联网搜索供应商 0-无 1-火山引擎 2-PPLX引擎 3-硅基流动
search_providers = 0
waiting_for_reference = False
# 用于处理硅基的 [citation:1] 的栈
citation_stack_reference = [
"[",
"c",
"i",
"t",
"a",
"t",
"i",
"o",
"n",
":",
"",
"]",
]
citation_stack = []
# 临时保存的未处理的字符串
unprocessed_content = ""
# 验证配置
if not self.valves.DEEPSEEK_API_KEY:
yield json.dumps({"error": "未配置API密钥"}, ensure_ascii=False)
return
# 准备请求参数
headers = {
"Authorization": f"Bearer {self.valves.DEEPSEEK_API_KEY}",
"Content-Type": "application/json",
}
try:
# 模型ID提取
model_id = body["model"].split(".", 1)[-1]
payload = {**body, "model": model_id}
# 处理消息以防止连续的相同角色
messages = payload["messages"]
i = 0
while i < len(messages) - 1:
if messages[i]["role"] == messages[i + 1]["role"]:
# 插入具有替代角色的占位符消息
alternate_role = (
"assistant" if messages[i]["role"] == "user" else "user"
)
messages.insert(
i + 1,
{"role": alternate_role, "content": "[Unfinished thinking]"},
)
i += 1
# 发起API请求
async with httpx.AsyncClient(http2=True) as client:
async with client.stream(
"POST",
f"{self.valves.DEEPSEEK_API_BASE_URL}/chat/completions",
json=payload,
headers=headers,
timeout=300,
) as response:
# 错误处理
if response.status_code != 200:
error = await response.aread()
yield self._format_error(response.status_code, error)
return
# 流式处理响应
async for line in response.aiter_lines():
if not line.startswith(self.data_prefix):
continue
# 截取 JSON 字符串
json_str = line[len(self.data_prefix) :].strip()
# 去除首尾空格后检查是否为结束标记
if json_str == "[DONE]":
return
try:
data = json.loads(json_str)
except json.JSONDecodeError as e:
error_detail = f"解析失败 - 内容:{json_str},原因:{e}"
yield self._format_error("JSONDecodeError", error_detail)
return
if search_providers == 0:
# 检查 delta 中的搜索结果
choices = data.get("choices")
if not choices or len(choices) == 0:
continue # 跳过没有 choices 的数据块
delta = choices[0].get("delta", {})
if delta.get("type") == "search_result":
search_results = delta.get("search_results", [])
if search_results:
ref_count = len(search_results)
yield '<details type="search">\n'
yield f"<summary>已搜索 {ref_count} 个网站</summary>\n"
for idx, result in enumerate(search_results, 1):
yield f'> {idx}. [{result["title"]}]({result["url"]})\n'
yield "</details>\n"
search_providers = 3
stored_references = search_results
continue
# 处理参考资料
stored_references = data.get("references", []) + data.get(
"citations", []
)
if stored_references:
ref_count = len(stored_references)
yield '<details type="search">\n'
yield f"<summary>已搜索 {ref_count} 个网站</summary>\n"
# 如果data中有references,则说明是火山引擎的返回结果
if data.get("references"):
for idx, reference in enumerate(stored_references, 1):
yield f'> {idx}. [{reference["title"]}]({reference["url"]})\n'
yield "</details>\n"
search_providers = 1
# 如果data中有citations,则说明是PPLX引擎的返回结果
elif data.get("citations"):
for idx, reference in enumerate(stored_references, 1):
yield f"> {idx}. {reference}\n"
yield "</details>\n"
search_providers = 2
# 方案 A: 检查 choices 是否存在且非空
choices = data.get("choices")
if not choices or len(choices) == 0:
continue # 跳过没有 choices 的数据块
choice = choices[0]
# 结束条件判断
if choice.get("finish_reason"):
return
# 状态机处理
state_output = await self._update_thinking_state(
choice.get("delta", {}), thinking_state
)
if state_output:
yield state_output
if state_output == "<think>":
yield "\n"
# 处理并立即发送内容
content = self._process_content(choice["delta"])
if content:
# 处理思考状态标记
if content.startswith("<think>"):
content = re.sub(r"^<think>", "", content)
yield "<think>"
await asyncio.sleep(0.1)
yield "\n"
elif content.startswith("</think>"):
content = re.sub(r"^</think>", "", content)
yield "</think>"
await asyncio.sleep(0.1)
yield "\n"
# 处理参考资料
if search_providers == 1:
# 火山引擎的参考资料处理
# 如果文本中包含"摘要",设置等待标志
if "摘要" in content:
waiting_for_reference = True
yield content
continue
# 如果正在等待参考资料的数字
if waiting_for_reference:
# 如果内容仅包含数字或"、"
if re.match(r"^(\d+|、)$", content.strip()):
numbers = re.findall(r"\d+", content)
if numbers:
num = numbers[0]
ref_index = int(num) - 1
if 0 <= ref_index < len(stored_references):
ref_url = stored_references[ref_index][
"url"
]
else:
ref_url = ""
content = f"[[{num}]]({ref_url})"
# 保持等待状态继续处理后续数字
# 如果遇到非数字且非"、"的内容且不含"摘要",停止等待
elif not "摘要" in content:
waiting_for_reference = False
elif search_providers == 2:
# PPLX引擎的参考资料处理
def replace_ref(m):
idx = int(m.group(1)) - 1
if 0 <= idx < len(stored_references):
return f"[[{m.group(1)}]]({stored_references[idx]})"
return f"[[{m.group(1)}]]()"
content = re.sub(r"\[(\d+)\]", replace_ref, content)
elif search_providers == 3:
skip_outer = False
if len(unprocessed_content) > 0:
content = unprocessed_content + content
unprocessed_content = ""
for i in range(len(content)):
# 检查 content[i] 是否可访问
if i >= len(content):
break
# 检查 citation_stack_reference[len(citation_stack)] 是否可访问
if len(citation_stack) >= len(
citation_stack_reference
):
break
if (
content[i]
== citation_stack_reference[len(citation_stack)]
):
citation_stack.append(content[i])
# 如果 citation_stack 的位数等于 citation_stack_reference 的位数,则修改为 URL 格式返回
if len(citation_stack) == len(
citation_stack_reference
):
# 检查 citation_stack[10] 是否可访问
if len(citation_stack) > 10:
ref_index = int(citation_stack[10]) - 1
# 检查 stored_references[ref_index] 是否可访问
if (
0
<= ref_index
< len(stored_references)
):
ref_url = stored_references[
ref_index
]["url"]
else:
ref_url = ""
# 将content中剩余的部分保存到unprocessed_content中
unprocessed_content = "".join(
content[i + 1 :]
)
content = f"[[{citation_stack[10]}]]({ref_url})"
citation_stack = []
skip_outer = False
break
else:
skip_outer = True
elif (
citation_stack_reference[len(citation_stack)]
== ""
):
# 判断是否为数字
if content[i].isdigit():
citation_stack.append(content[i])
skip_outer = True
else:
# 将 citation_stack 中全部元素拼接成字符串
content = "".join(citation_stack) + content
citation_stack = []
elif (
citation_stack_reference[len(citation_stack)]
== "]"
):
# 判断前一位是否为数字
if citation_stack[-1].isdigit():
citation_stack[-1] += content[i]
skip_outer = True
else:
content = "".join(citation_stack) + content
citation_stack = []
else:
if len(citation_stack) > 0:
# 将 citation_stack 中全部元素拼接成字符串
content = "".join(citation_stack) + content
citation_stack = []
if skip_outer:
continue
yield content
except Exception as e:
yield self._format_exception(e)
async def _update_thinking_state(self, delta: dict, thinking_state: dict) -> str:
"""更新思考状态机(简化版)"""
state_output = ""
if thinking_state["thinking"] == -1 and delta.get("reasoning_content"):
thinking_state["thinking"] = 0
state_output = "<think>"
elif (
thinking_state["thinking"] == 0
and not delta.get("reasoning_content")
and delta.get("content")
):
thinking_state["thinking"] = 1
state_output = "\n</think>\n\n"
return state_output
def _process_content(self, delta: dict) -> str:
"""直接返回处理后的内容"""
return delta.get("reasoning_content", "") or delta.get("content", "")
def _emit_status(self, description: str, done: bool = False) -> Awaitable[None]:
"""发送状态更新"""
if self.emitter:
return self.emitter(
{
"type": "status",
"data": {
"description": description,
"done": done,
},
}
)
return None
def _format_error(self, status_code: int, error: bytes) -> str:
if isinstance(error, str):
error_str = error
else:
error_str = error.decode(errors="ignore")
try:
err_msg = json.loads(error_str).get("message", error_str)[:200]
except Exception:
err_msg = error_str[:200]
return json.dumps(
{"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
)
def _format_exception(self, e: Exception) -> str:
tb_lines = traceback.format_exception(type(e), e, e.__traceback__)
detailed_error = "".join(tb_lines)
return json.dumps({"error": detailed_error}, ensure_ascii=False)