项目概述
NexusDB 是一个基于 PyQt5 的桌面应用程序,集成了 AI 辅助功能,用于生成和执行 MongoDB 查询。它提供了一个直观的图形界面,允许用户输入中文问题,自动生成优化的 MongoDB 查询,并支持多轮筛选和文档分析。
核心功能
- 智能查询生成: 使用 AI 将自然语言问题转换为 MongoDB 查询
- 黑名单管理: 支持设置不希望出现在查询中的词汇
- 多类型查询: 支持多种查询模式(首次查询、内容过滤、独立过滤)
- 实时结果展示: 即时显示查询结果和处理状态
- RESTful API: 提供完整的 Web 服务接口
架构设计
三层架构
项目采用经典的三层架构设计:
# 架构层次
1. GUI 层 (Presentation Layer) - main.py
2. 业务逻辑层 (Business Logic Layer) - Core/ 目录
3. 工具层 (Tool Layer) - Tool/ 目录
核心模块关系
main.py (GUI) → Core/QuestionBuilder.py → Tool/DatabaseConnector.py → MongoDB
核心模块详解
1. GUI 模块 (main.py)
PyQt5 界面设计方法:
class DataInputWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setup_ui()
def setup_ui(self):
# 主布局
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QHBoxLayout(central_widget)
# 左侧输入区域
left_widget = QWidget()
left_layout = QVBoxLayout(left_widget)
# 问题输入框
self.question_input = QTextEdit()
self.question_input.setPlaceholderText("请输入您的问题...")
left_layout.addWidget(QLabel("问题输入:"))
left_layout.addWidget(self.question_input)
# 黑名单管理
self.banned_list = QVBoxLayout()
left_layout.addWidget(QLabel("黑名单词汇:"))
left_layout.addLayout(self.banned_list)
# 处理按钮
self.process_btn = QPushButton("处理")
self.process_btn.clicked.connect(self.handle_process)
left_layout.addWidget(self.process_btn)
# 右侧结果区域
right_widget = QWidget()
right_layout = QVBoxLayout(right_widget)
self.result_display = QTextEdit()
self.result_display.setReadOnly(True)
right_layout.addWidget(QLabel("处理结果:"))
right_layout.addWidget(self.result_display)
main_layout.addWidget(left_widget, 1)
main_layout.addWidget(right_widget, 2)
2. AI 查询生成器 (Core/QuestionBuilder.py)
基础查询构造器类:
class BaseQuestionConstructor(ABC):
def __init__(self, core_issue: str, banned_words: List[str] = None):
self.core_issue = core_issue
self.banned_words = banned_words or []
self.robot = ArkChatClient()
@abstractmethod
def _generate_prompt(self) -> str:
pass
def _get_ai_back(self, prompt: str) -> str:
try:
response = self.robot.chat_completions_create(
model="ep-20250731092606-cgqmr",
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
max_tokens=2000
)
return response.choices[0].message.content
except Exception as e:
return f"AI 服务错误: {str(e)}"
def construct_question(self) -> Dict:
prompt = self._generate_prompt()
ai_response = self._get_ai_back(prompt)
return self._parse_response(ai_response)
首次查询构造器实现:
class FirstQuestionConstructor(BaseQuestionConstructor):
def _generate_prompt(self) -> str:
prompt = f"""将以下中文问题转换为英文关键词,并生成MongoDB查询:
原始问题: {self.core_issue}
{"排除词汇: " + ", ".join(self.banned_words) if self.banned_words else ""}
要求:
1. 提取核心英文关键词
2. 生成MongoDB查询JSON
3. 使用$regex进行模糊匹配
4. 使用$options: "i"忽略大小写
5. 使用$or操作符覆盖多个字段
6. 排除黑名单词汇
返回格式:
{{
"english_keywords": ["keyword1", "keyword2"],
"mongodb_query": {{
"$or": [
{{"title": {{"$regex": "pattern", "$options": "i"}}}},
{{"info": {{"$regex": "pattern", "$options": "i"}}}},
{{"codeOrCase": {{"$regex": "pattern", "$options": "i"}}}}
],
"title": {{"$not": {{"$regex": "banned_pattern"}}}}
}}
}}
"""
return prompt
3. 数据库连接器 (Tool/DatabaseConnector.py)
MongoDB 查询执行方法:
class DatabaseConnector:
def __init__(self):
self.client = MongoClient("mongodb://localhost:27017/")
self.db = self.client["dbNewSave"]
def check_db_collection(self, collection_name: str, query: Dict) -> List[Dict]:
"""执行MongoDB查询并返回结果"""
try:
collection = self.db[collection_name]
results = list(collection.find(query))
return results
except Exception as e:
print(f"数据库查询错误: {e}")
return []
def get_docs_by_object_ids(self, collection_name: str, object_ids: List[str]) -> List[Dict]:
"""根据ObjectId列表查询文档"""
try:
collection = self.db[collection_name]
object_id_list = [ObjectId(oid) for oid in object_ids]
query = {"_id": {"$in": object_id_list}}
return list(collection.find(query))
except Exception as e:
print(f"ObjectId查询错误: {e}")
return []
API 服务开发
FastAPI Web 服务 (Tool/fastAPItest.py)
完整的 RESTful API 实现:
app = FastAPI()
chain_managers = {}
chain_lock = threading.Lock()
class ChainManager:
def __init__(self, core_issue: str, banned_words: List[str]):
self.chain_id = str(uuid.uuid4())
self.core_issue = core_issue
self.banned_words = banned_words
self.current_status = "waiting_order"
self.epoch = 1
self.current_step = ""
self.history_answer = ""
self.key_point = ""
@app.post("/chain_creator")
async def chain_creator(request: ChainCreateRequest):
"""创建新的查询链"""
with chain_lock:
chain_manager = ChainManager(request.core_issue, request.banned_words)
chain_managers[chain_manager.chain_id] = chain_manager
return {"chain_id": chain_manager.chain_id}
@app.get("/get_chain_info/{chain_id}")
async def get_chain_info(chain_id: str):
"""获取查询链信息"""
if chain_id not in chain_managers:
raise HTTPException(status_code=404, detail="Chain not found")
chain = chain_managers[chain_id]
return {
"current_status": chain.current_status,
"core_issue": chain.core_issue,
"banned_words": chain.banned_words,
"chain_id": chain.chain_id,
"epoch": chain.epoch,
"current_step": chain.current_step,
"history_answer": chain.history_answer,
"key_point": chain.key_point if chain.current_status in ["secondary_analyze", "end"] else None
}
查询处理流程
多步骤处理机制
# 完整的查询处理流程
def full_processing_flow(core_issue: str, banned_words: List[str], collection_name: str):
# 1. 生成首次查询
first_constructor = FirstQuestionConstructor(core_issue, banned_words)
first_query = first_constructor.construct_question()
# 2. 执行数据库查询
connector = DatabaseConnector()
initial_results = connector.check_db_collection(collection_name, first_query["mongodb_query"])
# 3. 内容过滤
if len(initial_results) > 9:
content_filter = ContentFilterConstructor(core_issue, banned_words)
filtered_results = content_filter.filter_results(initial_results)
else:
filtered_results = initial_results
# 4. 独立过滤
isolated_filter = IsolatedFilterConstructor(core_issue, banned_words)
final_results = isolated_filter.filter_results(filtered_results)
# 5. 生成摘要
if final_results:
summarizer = HTMLDocumentSummarizer()
summary = summarizer.generate_summary(final_results[0])
return summary
return "未找到相关结果"
AI 集成方法
多AI服务支持
class ArkChatClient:
def __init__(self):
self.base_url = "https://ark.cn-beijing.volces.com/api/v3"
self.api_key = os.getenv("ARK_CHAT_API_KEY", "default-key")
self.client = OpenAI(
base_url=self.base_url,
api_key=self.api_key
)
def chat_completions_create(self, **kwargs):
return self.client.chat.completions.create(**kwargs)
class SiliconFlowClient:
def __init__(self):
self.base_url = "https://api.siliconflow.cn/v1"
self.api_key = os.getenv("SILICONFLOW_API_KEY")
self.client = OpenAI(
base_url=self.base_url,
api_key=self.api_key
)
def chat_completions_create(self, **kwargs):
# SiliconFlow 使用不同的参数格式
adapted_kwargs = {
"model": kwargs.get("model", "Qwen/Qwen3-235B-A22B-Thinking-2507"),
"messages": kwargs["messages"],
"temperature": kwargs.get("temperature", 0.1),
"max_tokens": kwargs.get("max_tokens", 2000)
}
return self.client.chat.completions.create(**adapted_kwargs)
并发处理与状态管理
线程安全的链管理
class ChainManager:
def __init__(self, core_issue: str, banned_words: List[str]):
self.chain_id = str(uuid.uuid4())
self.core_issue = core_issue
self.banned_words = banned_words
self.current_status = "waiting_order"
self.lock = threading.Lock()
self.results = []
def update_status(self, new_status: str):
"""线程安全的状态更新"""
with self.lock:
self.current_status = new_status
def add_results(self, new_results: List[Dict]):
"""添加查询结果"""
with self.lock:
self.results.extend(new_results)
def get_results(self) -> List[Dict]:
"""获取当前结果"""
with self.lock:
return self.results.copy()
开发最佳实践
1. 错误处理模式
def safe_database_operation(func):
"""数据库操作装饰器"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except ConnectionError as e:
print(f"数据库连接错误: {e}")
return []
except TimeoutError as e:
print(f"查询超时: {e}")
return []
except Exception as e:
print(f"未知错误: {e}")
return []
return wrapper
@safe_database_operation
def execute_query(collection_name: str, query: Dict) -> List[Dict]:
collection = db[collection_name]
return list(collection.find(query))
2. 配置管理
class Config:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.load_config()
return cls._instance
def load_config(self):
self.mongodb_uri = os.getenv("MONGODB_URI", "mongodb://localhost:27017/")
self.database_name = os.getenv("DATABASE_NAME", "dbNewSave")
self.ai_provider = os.getenv("AI_PROVIDER", "ark_chat")
self.max_results = int(os.getenv("MAX_RESULTS", "9"))
3. 日志记录
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('nexusdb.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def log_operation(func):
"""操作日志装饰器"""
def wrapper(*args, **kwargs):
logger.info(f"开始执行: {func.__name__}")
try:
result = func(*args, **kwargs)
logger.info(f"成功完成: {func.__name__}")
return result
except Exception as e:
logger.error(f"执行失败: {func.__name__}, 错误: {e}")
raise
return wrapper
总结
NexusDB 项目展示了如何将现代 AI 技术与传统数据库查询相结合,创建智能化的查询系统。通过模块化设计、多AI服务支持、完整的API接口和友好的GUI界面,该项目提供了一个强大的开发框架。