利用Python的多进程机制、Selenium浏览器自动化工具,以及Dify平台的大语言模型(LLM)来批量清洗HTML文件中的数据。项目的目标是从一批本地HTML文件中提取关键信息(如标题、代码示例、解释等),并将其结构化为JSON格式。

核心思路

  • 使用Selenium加载本地HTML文件,获取页面源代码。

  • 通过API调用Dify的工作流,将源代码传入大模型进行清洗和提取。

  • 采用多进程加速处理(我设置了4个进程),避免单线程的瓶颈。

  • 处理完后合并结果,并记录已完成的URL以防重复工作。

代码详解

辅助函数:JSON处理和正则备份

代码开头定义了几个辅助函数,主要用于处理大模型返回的JSON字符串,因为大模型的输出有时不完美。

  • use_re_keep_json(target_string): 这是一个备用方案。如果直接json.loads()失败,就用正则表达式从字符串中提取四个字段("提取"、"标题"、"代码或指令示例"、"详细解释"),然后手动构建字典再转JSON。为什么需要这个?因为大模型有时会输出带Markdown或额外文本的JSON,导致解析出错。

  • preprocess_json_string(json_str): 用于转义代码段中的双引号,避免JSON解析时出错。使用正则替换非键值的双引号为\"。

  • read_json_array(file_path) load_finish_urls(): 读取输入的JSON数组(包含HTML路径元数据)和已完成URL列表,确保不重复处理。

核心逻辑:调用Dify大模型

run_workflow_and_get_clean_output(targetInfo) 是核心,通过POST请求调用Dify的API(URL: http://localhost/v1/workflows/run),传入HTML源代码作为输入。

  • 请求设置: 使用stream=True启用流式响应,因为Dify的输出是流式的(SSE格式)。headers带上了API密钥。

  • 处理流式输出: 通过response.iter_lines()逐行读取。忽略非JSON行,只捕获event: "workflow_finished"事件,并从中提取outputs["text"]作为最终输出。

  • 输出清洗: 先用正则移除可能的<think>标签(主要应对能思考的大模型)。然后匹配Markdown包裹的JSON(````json ... ```),提取纯JSON字符串。最后尝试json.loads(),如果失败就 fallback 到正则提取。

如果一切顺利,返回一个字典;否则返回错误信息。

多进程任务处理:process_tasks(pid, tasks, finish_url)

每个进程负责一部分任务列表(HTML元数据)。

  • Selenium初始化: 使用headless Chrome加载本地HTML文件(file:///path),获取page_source。

  • 逐文件处理: 调用上述核心函数清洗数据。如果成功,添加sourceURL字段,存入列表,并更新finish_url。每10个任务保存一次中间结果(JSON文件),以防进程崩溃。

  • 错误处理: 如果出错,打印日志并重启Selenium driver。进程结束时保存最终结果。

为了避免TPM的问题,添加了sleep(1)sleep(5)来控制节奏。

遇到的问题及解决方案

在开发过程中,发现几个典型的大模型集成痛点,尤其是Dify这种流式API的调用。

大模型给出的结果直接转换JSON有问题

大模型的输出有时不是纯JSON,比如裹在Markdown代码块里(````json ... ```),或者包含额外文本、未转义的双引号,甚至格式不标准。这导致json.loads()直接抛异常。

解决方案:

  • 先用正则匹配提取纯JSON部分(re.search(r'```json\n([\s\S]*?)\n```'))。

  • 如果还是失败,启用use_re_keep_json函数,通过四个独立的正则匹配每个字段的内容,然后自动组装字典。

  • 额外加了preprocess_json_string来转义代码段的双引号,确保JSON有效。

这个方案让成功率大大提升,大语言模型的输出并不总是完美的必须有后处理来兜底。

模型思考过慢

Dify的工作流中,大模型有时会“思考”很久,尤其是输入HTML很长时。单进程处理时,整个脚本卡住;多进程下,如果不控制节奏,可能会导致API限流或服务器压力大。

解决方案:

  • 引入多进程(multiprocessing),并行处理任务。

  • 使用多个运营商缓解TPM问题,这样能极大增加效率。

  • 在循环中添加sleep(1)(每个任务后)和sleep(5)(每10个任务后),给模型缓冲时间,也避免频繁请求被运营商限流。

流式输出的获取方法

Dify的API是流式(streaming),不是一次性返回全部响应。如果用普通response.text,会卡住或只拿到部分数据。

解决方案:

  • 设置stream=True,然后用response.iter_lines()迭代读取每行。

  • 去除前缀"data: ",尝试JSON解析,只关注"event": "workflow_finished"事件(这是Dify的结束标志)。

  • 忽略中间的无效行,确保只提取最终输出。

完整代码

import json
import os
import re
import multiprocessing
from time import sleep

import requests
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
import time

# 这里是备用方案,如果大模型给出的结果直接转换json有问题,那么通过正则匹配先获取内容,存入字典中,再转换为json
def use_re_keep_json(target_string):
    extract_part = re.search(r'"提取":\s*(.*?)\s*,\s*"标题"', target_string).group(1)
    title_part = re.search(r'"标题":\s*(.*?)\s*,\s*"代码或指令示例"', target_string).group(1)
    code_part = re.search(r'"代码或指令示例":\s*(.*?)\s*,\s*"详细解释"', target_string, re.DOTALL).group(1)
    explanation_part = re.search(r'"详细解释":\s*(.*)\s*}', target_string, re.DOTALL).group(1)[:-1]
    return json.loads(json.dumps({"提取":extract_part,"标题":title_part,"代码或指令示例":code_part,"详细解释":explanation_part}))

def preprocess_json_string(json_str):
    # 匹配代码段中的双引号(排除JSON键值对的双引号)
    pattern = r'(?<!\\)"(?=[^"]*"[^"]*(?:"[^"]*"[^"]*)*$)'
    # 替换代码段中的双引号为转义形式
    processed_str = re.sub(pattern, r'\"', json_str)
    return processed_str


def read_json_array(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            return json.load(f)
    except Exception as e:
        print(f"读取JSON文件出错: {e}")
        return []

# 获取下之前清理完成的路径保证不重复工作
def load_finish_urls():
    try:
        if os.path.exists('finish_url.json'):
            with open('finish_url.json', 'r', encoding='utf-8') as f:
                return json.load(f)
    except Exception as e:
        print(f"加载finish_url.json出错: {e}")
    return []

# 核心代码访问dify里面制作好的大语言模型来完成数据清洗
def run_workflow_and_get_clean_output(targetInfo):
    url = "http://localhost/v1/workflows/run"
    payload = json.dumps({
        "inputs": {
            "targetHTML": str(targetInfo),
        },
        "response_mode": "streaming",
        "user": "abc-123"
    })
    headers = {
        'Authorization': 'Bearer dify的API',
        'Content-Type': 'application/json'
    }

    try:
        # dify用的是流式输出
        response = requests.post(url, headers=headers, data=payload, stream=True)
        response.raise_for_status()  # 检查HTTP错误

        final_output = None
        for line in response.iter_lines():
            if line:
                # 解码并尝试解析JSON
                decoded_line = line.decode('utf-8')
                decoded_line = decoded_line.replace("data: ", '')
                try:
                    event_data = json.loads(decoded_line)
                    if event_data.get("event") == "workflow_finished": # 通过该标志获取流式的结尾
                        final_output = event_data["data"]["outputs"]["text"]
                        break
                except json.JSONDecodeError:
                    continue  # 这里如果出问题不是大语言模型的输出问题,而是dify返回的格式出问题了,这里可以不管

        if final_output is None:
            return {"error": "未捕获到工作流结束事件"}


        # 应对如果由think的情况但是目前用不上
        cleaned_output = re.sub(r'^.*?</think>\s*\n?', '', final_output, flags=re.DOTALL)

        # 如果正则匹配失败,则使用split方法
        if cleaned_output == final_output:
            parts = final_output.split("</think>", 1)
            cleaned_output = parts[-1].lstrip() if len(parts) > 1 else final_output

        # 返回解析后的JSON对象
        try:
            pattern = r'```json\n([\s\S]*?)\n```'# 有时候大模型返回的json会用markdown的方法标注起来需要清理
            match = re.search(pattern, cleaned_output)
            cleaned_output = match.group(1)
            return json.loads(cleaned_output)
        except:
            return json.loads(cleaned_output)
    except requests.exceptions.RequestException as e:
        return {"error": f"请求失败: {str(e)}"}
    except json.JSONDecodeError:
        try:# 这里就是大模型返回的问题,如果不是标准json格式那么我们就触发正则匹配去获取json
            aasw = use_re_keep_json(cleaned_output)
            return aasw
        except:
            return {"error": "输出内容不是有效的JSON格式"}


def process_tasks(pid, tasks, finish_url):
    # 使用selenium访问该文件,其实有点没必要,因为后面我看直接获取html的内容也可以,但是结果基本都一样的
    chrome_options = Options()
    chrome_options.add_argument("--headless=new")
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")

    # 初始化WebDriver
    driver = webdriver.Chrome(
        service=Service(),
        options=chrome_options
    )

    allInfo = []
    processed = 0

    try:
        for meta in tasks:
            html_path = os.path.abspath(meta['path'])

            if html_path in finish_url:
                print(f"进程 {pid} 跳过已处理的文件: {html_path}")
                continue

            try:
                print(f"进程 {pid} 正在处理: {html_path}")
                driver.get(f"file:///{html_path}")
                page_source = driver.page_source
                back_info = run_workflow_and_get_clean_output(page_source)

                if "error" not in back_info:
                    back_info['sourceURL'] = html_path
                    allInfo.append(back_info)
                    finish_url.append(html_path)
                    processed += 1
                    sleep(1)
                    # 每处理10个任务保存一次进度
                    if processed % 10 == 0:
                        print(f"进程 {pid} 处理了 {processed} 个任务")
                        timestamp = int(time.time())
                        sleep(5)
                        # 保存当前进程的结果
                        with open(f'results_{pid}_{timestamp}.json', 'w', encoding='utf-8') as f:
                            json.dump(allInfo, f, ensure_ascii=False, indent=4)

                        # 保存当前进程的完成URL,如果是出问题的不会写在里面
                        with open(f'finish_url_{pid}.json', 'w', encoding='utf-8') as f:
                            json.dump(finish_url, f, ensure_ascii=False, indent=4)
                else:
                    print(f"进程 {pid} 处理出错: {back_info['error']} - {html_path}")

            except Exception as e:
                print(f"进程 {pid} 处理文件时出错: {e} - {html_path}")
                try:
                    driver.quit()
                except:
                    pass

                driver = webdriver.Chrome(
                    service=Service(),
                    options=chrome_options
                )

        # 保存最后一次进度,合并每个进程爬取的url
        if allInfo:
            timestamp = int(time.time())
            with open(f'results_{pid}_{timestamp}.json', 'w', encoding='utf-8') as f:
                json.dump(allInfo, f, ensure_ascii=False, indent=4)
            with open(f'finish_url_{pid}.json', 'w', encoding='utf-8') as f:
                json.dump(finish_url, f, ensure_ascii=False, indent=4)

    finally:
        driver.quit()
        print(f"进程 {pid} 完成,共处理 {processed} 个任务")


def merge_results(num_processes):# 合并全部结果
    all_results = []
    finish_urls = set()

    # 合并完成URL
    for pid in range(num_processes):
        file_name = f'finish_url_{pid}.json'
        if os.path.exists(file_name):
            try:
                with open(file_name, 'r', encoding='utf-8') as f:
                    urls = json.load(f)
                    finish_urls.update(urls)
            except Exception as e:
                print(f"合并URL文件时出错: {e}")

    # 保存合并后的finish_url
    with open('finish_url.json', 'w', encoding='utf-8') as f:
        json.dump(list(finish_urls), f, ensure_ascii=False, indent=4)

    # 合并结果文件
    result_files = [f for f in os.listdir() if f.startswith('results_') and f.endswith('.json')]
    for file in result_files:
        try:
            with open(file, 'r', encoding='utf-8') as f:
                data = json.load(f)
                all_results.extend(data)
        except Exception as e:
            print(f"合并结果文件{file}时出错: {e}")

    # 保存合并结果
    if all_results:
        timestamp = int(time.time())
        with open(f'final_results_{timestamp}.json', 'w', encoding='utf-8') as f:
            json.dump(all_results, f, ensure_ascii=False, indent=4)

    print(f"合并完成,共处理 {len(finish_urls)} 个URL,生成 {len(all_results)} 条记录")


if __name__ == "__main__":
    num_processes = 4
    file_path = r"目标文件位置"  # 注意原始路径格式

    # 读取原始数据
    original_data = read_json_array(file_path)
    existing_finish_urls = load_finish_urls()

    # 将清理过的url过一遍
    tasks_to_process = [
        meta for meta in original_data
        if meta['type_id'] in [0, 3, 1, 6, 16, 10]
           and os.path.abspath(meta['path']) not in existing_finish_urls
    ]

    print(f"总任务数: {len(original_data)},需处理数: {len(tasks_to_process)}")

    # 分配任务给不同进程
    chunk_size = len(tasks_to_process) // num_processes
    processes = []

    for i in range(num_processes):
        start = i * chunk_size
        # 最后一个进程处理剩余的所有任务
        end = (i + 1) * chunk_size if i < num_processes - 1 else len(tasks_to_process)
        assigned_tasks = tasks_to_process[start:end]

        # 每个进程使用自己的完成URL列表,这里就不弄共享列表了太麻烦了
        process_finish_urls = existing_finish_urls.copy()

        p = multiprocessing.Process(
            target=process_tasks,  # 传递函数引用
            args=(i, assigned_tasks, process_finish_urls)
        )
        p.start()
        processes.append(p)
    for p in processes:
        p.join()
    merge_results(num_processes)