项目概述

NexusDB 是一个基于 PyQt5 的桌面应用程序,集成了 AI 辅助功能,用于生成和执行 MongoDB 查询。它提供了一个直观的图形界面,允许用户输入中文问题,自动生成优化的 MongoDB 查询,并支持多轮筛选和文档分析。

核心功能

  • 智能查询生成: 使用 AI 将自然语言问题转换为 MongoDB 查询
  • 黑名单管理: 支持设置不希望出现在查询中的词汇
  • 多类型查询: 支持多种查询模式(首次查询、内容过滤、独立过滤)
  • 实时结果展示: 即时显示查询结果和处理状态
  • RESTful API: 提供完整的 Web 服务接口

架构设计

三层架构

项目采用经典的三层架构设计:

# 架构层次
1. GUI 层 (Presentation Layer) - main.py
2. 业务逻辑层 (Business Logic Layer) - Core/ 目录
3. 工具层 (Tool Layer) - Tool/ 目录

核心模块关系

main.py (GUI) → Core/QuestionBuilder.py → Tool/DatabaseConnector.py → MongoDB

核心模块详解

1. GUI 模块 (main.py)

PyQt5 界面设计方法

class DataInputWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setup_ui()
        
    def setup_ui(self):
        # 主布局
        central_widget = QWidget()
        self.setCentralWidget(central_widget)
        main_layout = QHBoxLayout(central_widget)
        
        # 左侧输入区域
        left_widget = QWidget()
        left_layout = QVBoxLayout(left_widget)
        
        # 问题输入框
        self.question_input = QTextEdit()
        self.question_input.setPlaceholderText("请输入您的问题...")
        left_layout.addWidget(QLabel("问题输入:"))
        left_layout.addWidget(self.question_input)
        
        # 黑名单管理
        self.banned_list = QVBoxLayout()
        left_layout.addWidget(QLabel("黑名单词汇:"))
        left_layout.addLayout(self.banned_list)
        
        # 处理按钮
        self.process_btn = QPushButton("处理")
        self.process_btn.clicked.connect(self.handle_process)
        left_layout.addWidget(self.process_btn)
        
        # 右侧结果区域
        right_widget = QWidget()
        right_layout = QVBoxLayout(right_widget)
        self.result_display = QTextEdit()
        self.result_display.setReadOnly(True)
        right_layout.addWidget(QLabel("处理结果:"))
        right_layout.addWidget(self.result_display)
        
        main_layout.addWidget(left_widget, 1)
        main_layout.addWidget(right_widget, 2)

2. AI 查询生成器 (Core/QuestionBuilder.py)

基础查询构造器类

class BaseQuestionConstructor(ABC):
    def __init__(self, core_issue: str, banned_words: List[str] = None):
        self.core_issue = core_issue
        self.banned_words = banned_words or []
        self.robot = ArkChatClient()
    
    @abstractmethod
    def _generate_prompt(self) -> str:
        pass
    
    def _get_ai_back(self, prompt: str) -> str:
        try:
            response = self.robot.chat_completions_create(
                model="ep-20250731092606-cgqmr",
                messages=[{"role": "user", "content": prompt}],
                temperature=0.1,
                max_tokens=2000
            )
            return response.choices[0].message.content
        except Exception as e:
            return f"AI 服务错误: {str(e)}"
    
    def construct_question(self) -> Dict:
        prompt = self._generate_prompt()
        ai_response = self._get_ai_back(prompt)
        return self._parse_response(ai_response)

首次查询构造器实现

class FirstQuestionConstructor(BaseQuestionConstructor):
    def _generate_prompt(self) -> str:
        prompt = f"""将以下中文问题转换为英文关键词,并生成MongoDB查询:
        
        原始问题: {self.core_issue}
        {"排除词汇: " + ", ".join(self.banned_words) if self.banned_words else ""}
        
        要求:
        1. 提取核心英文关键词
        2. 生成MongoDB查询JSON
        3. 使用$regex进行模糊匹配
        4. 使用$options: "i"忽略大小写
        5. 使用$or操作符覆盖多个字段
        6. 排除黑名单词汇
        
        返回格式:
        {{
            "english_keywords": ["keyword1", "keyword2"],
            "mongodb_query": {{
                "$or": [
                    {{"title": {{"$regex": "pattern", "$options": "i"}}}},
                    {{"info": {{"$regex": "pattern", "$options": "i"}}}},
                    {{"codeOrCase": {{"$regex": "pattern", "$options": "i"}}}}
                ],
                "title": {{"$not": {{"$regex": "banned_pattern"}}}}
            }}
        }}
        """
        return prompt

3. 数据库连接器 (Tool/DatabaseConnector.py)

MongoDB 查询执行方法

class DatabaseConnector:
    def __init__(self):
        self.client = MongoClient("mongodb://localhost:27017/")
        self.db = self.client["dbNewSave"]
    
    def check_db_collection(self, collection_name: str, query: Dict) -> List[Dict]:
        """执行MongoDB查询并返回结果"""
        try:
            collection = self.db[collection_name]
            results = list(collection.find(query))
            return results
        except Exception as e:
            print(f"数据库查询错误: {e}")
            return []
    
    def get_docs_by_object_ids(self, collection_name: str, object_ids: List[str]) -> List[Dict]:
        """根据ObjectId列表查询文档"""
        try:
            collection = self.db[collection_name]
            object_id_list = [ObjectId(oid) for oid in object_ids]
            query = {"_id": {"$in": object_id_list}}
            return list(collection.find(query))
        except Exception as e:
            print(f"ObjectId查询错误: {e}")
            return []

API 服务开发

FastAPI Web 服务 (Tool/fastAPItest.py)

完整的 RESTful API 实现

app = FastAPI()
chain_managers = {}
chain_lock = threading.Lock()

class ChainManager:
    def __init__(self, core_issue: str, banned_words: List[str]):
        self.chain_id = str(uuid.uuid4())
        self.core_issue = core_issue
        self.banned_words = banned_words
        self.current_status = "waiting_order"
        self.epoch = 1
        self.current_step = ""
        self.history_answer = ""
        self.key_point = ""

@app.post("/chain_creator")
async def chain_creator(request: ChainCreateRequest):
    """创建新的查询链"""
    with chain_lock:
        chain_manager = ChainManager(request.core_issue, request.banned_words)
        chain_managers[chain_manager.chain_id] = chain_manager
        
    return {"chain_id": chain_manager.chain_id}

@app.get("/get_chain_info/{chain_id}")
async def get_chain_info(chain_id: str):
    """获取查询链信息"""
    if chain_id not in chain_managers:
        raise HTTPException(status_code=404, detail="Chain not found")
    
    chain = chain_managers[chain_id]
    return {
        "current_status": chain.current_status,
        "core_issue": chain.core_issue,
        "banned_words": chain.banned_words,
        "chain_id": chain.chain_id,
        "epoch": chain.epoch,
        "current_step": chain.current_step,
        "history_answer": chain.history_answer,
        "key_point": chain.key_point if chain.current_status in ["secondary_analyze", "end"] else None
    }

查询处理流程

多步骤处理机制

# 完整的查询处理流程
def full_processing_flow(core_issue: str, banned_words: List[str], collection_name: str):
    # 1. 生成首次查询
    first_constructor = FirstQuestionConstructor(core_issue, banned_words)
    first_query = first_constructor.construct_question()
    
    # 2. 执行数据库查询
    connector = DatabaseConnector()
    initial_results = connector.check_db_collection(collection_name, first_query["mongodb_query"])
    
    # 3. 内容过滤
    if len(initial_results) > 9:
        content_filter = ContentFilterConstructor(core_issue, banned_words)
        filtered_results = content_filter.filter_results(initial_results)
    else:
        filtered_results = initial_results
    
    # 4. 独立过滤
    isolated_filter = IsolatedFilterConstructor(core_issue, banned_words)
    final_results = isolated_filter.filter_results(filtered_results)
    
    # 5. 生成摘要
    if final_results:
        summarizer = HTMLDocumentSummarizer()
        summary = summarizer.generate_summary(final_results[0])
        return summary
    
    return "未找到相关结果"

AI 集成方法

多AI服务支持

class ArkChatClient:
    def __init__(self):
        self.base_url = "https://ark.cn-beijing.volces.com/api/v3"
        self.api_key = os.getenv("ARK_CHAT_API_KEY", "default-key")
        self.client = OpenAI(
            base_url=self.base_url,
            api_key=self.api_key
        )
    
    def chat_completions_create(self, **kwargs):
        return self.client.chat.completions.create(**kwargs)

class SiliconFlowClient:
    def __init__(self):
        self.base_url = "https://api.siliconflow.cn/v1"
        self.api_key = os.getenv("SILICONFLOW_API_KEY")
        self.client = OpenAI(
            base_url=self.base_url,
            api_key=self.api_key
        )
    
    def chat_completions_create(self, **kwargs):
        # SiliconFlow 使用不同的参数格式
        adapted_kwargs = {
            "model": kwargs.get("model", "Qwen/Qwen3-235B-A22B-Thinking-2507"),
            "messages": kwargs["messages"],
            "temperature": kwargs.get("temperature", 0.1),
            "max_tokens": kwargs.get("max_tokens", 2000)
        }
        return self.client.chat.completions.create(**adapted_kwargs)

并发处理与状态管理

线程安全的链管理

class ChainManager:
    def __init__(self, core_issue: str, banned_words: List[str]):
        self.chain_id = str(uuid.uuid4())
        self.core_issue = core_issue
        self.banned_words = banned_words
        self.current_status = "waiting_order"
        self.lock = threading.Lock()
        self.results = []
    
    def update_status(self, new_status: str):
        """线程安全的状态更新"""
        with self.lock:
            self.current_status = new_status
    
    def add_results(self, new_results: List[Dict]):
        """添加查询结果"""
        with self.lock:
            self.results.extend(new_results)
    
    def get_results(self) -> List[Dict]:
        """获取当前结果"""
        with self.lock:
            return self.results.copy()

开发最佳实践

1. 错误处理模式

def safe_database_operation(func):
    """数据库操作装饰器"""
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except ConnectionError as e:
            print(f"数据库连接错误: {e}")
            return []
        except TimeoutError as e:
            print(f"查询超时: {e}")
            return []
        except Exception as e:
            print(f"未知错误: {e}")
            return []
    return wrapper

@safe_database_operation
def execute_query(collection_name: str, query: Dict) -> List[Dict]:
    collection = db[collection_name]
    return list(collection.find(query))

2. 配置管理

class Config:
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.load_config()
        return cls._instance
    
    def load_config(self):
        self.mongodb_uri = os.getenv("MONGODB_URI", "mongodb://localhost:27017/")
        self.database_name = os.getenv("DATABASE_NAME", "dbNewSave")
        self.ai_provider = os.getenv("AI_PROVIDER", "ark_chat")
        self.max_results = int(os.getenv("MAX_RESULTS", "9"))

3. 日志记录

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('nexusdb.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

def log_operation(func):
    """操作日志装饰器"""
    def wrapper(*args, **kwargs):
        logger.info(f"开始执行: {func.__name__}")
        try:
            result = func(*args, **kwargs)
            logger.info(f"成功完成: {func.__name__}")
            return result
        except Exception as e:
            logger.error(f"执行失败: {func.__name__}, 错误: {e}")
            raise
    return wrapper

总结

NexusDB 项目展示了如何将现代 AI 技术与传统数据库查询相结合,创建智能化的查询系统。通过模块化设计、多AI服务支持、完整的API接口和友好的GUI界面,该项目提供了一个强大的开发框架。