利用Python的多进程机制、Selenium浏览器自动化工具,以及Dify平台的大语言模型(LLM)来批量清洗HTML文件中的数据。项目的目标是从一批本地HTML文件中提取关键信息(如标题、代码示例、解释等),并将其结构化为JSON格式。
核心思路
使用Selenium加载本地HTML文件,获取页面源代码。
通过API调用Dify的工作流,将源代码传入大模型进行清洗和提取。
采用多进程加速处理(我设置了4个进程),避免单线程的瓶颈。
处理完后合并结果,并记录已完成的URL以防重复工作。
代码详解
辅助函数:JSON处理和正则备份
代码开头定义了几个辅助函数,主要用于处理大模型返回的JSON字符串,因为大模型的输出有时不完美。
use_re_keep_json(target_string): 这是一个备用方案。如果直接json.loads()失败,就用正则表达式从字符串中提取四个字段("提取"、"标题"、"代码或指令示例"、"详细解释"),然后手动构建字典再转JSON。为什么需要这个?因为大模型有时会输出带Markdown或额外文本的JSON,导致解析出错。
preprocess_json_string(json_str): 用于转义代码段中的双引号,避免JSON解析时出错。使用正则替换非键值的双引号为\"。
read_json_array(file_path) 和 load_finish_urls(): 读取输入的JSON数组(包含HTML路径元数据)和已完成URL列表,确保不重复处理。
核心逻辑:调用Dify大模型
run_workflow_and_get_clean_output(targetInfo) 是核心,通过POST请求调用Dify的API(URL: http://localhost/v1/workflows/run),传入HTML源代码作为输入。
请求设置: 使用stream=True启用流式响应,因为Dify的输出是流式的(SSE格式)。headers带上了API密钥。
处理流式输出: 通过response.iter_lines()逐行读取。忽略非JSON行,只捕获event: "workflow_finished"事件,并从中提取outputs["text"]作为最终输出。
输出清洗: 先用正则移除可能的<think>标签(主要应对能思考的大模型)。然后匹配Markdown包裹的JSON(````json ... ```),提取纯JSON字符串。最后尝试json.loads(),如果失败就 fallback 到正则提取。
如果一切顺利,返回一个字典;否则返回错误信息。
多进程任务处理:process_tasks(pid, tasks, finish_url)
每个进程负责一部分任务列表(HTML元数据)。
Selenium初始化: 使用headless Chrome加载本地HTML文件(file:///path),获取page_source。
逐文件处理: 调用上述核心函数清洗数据。如果成功,添加sourceURL字段,存入列表,并更新finish_url。每10个任务保存一次中间结果(JSON文件),以防进程崩溃。
错误处理: 如果出错,打印日志并重启Selenium driver。进程结束时保存最终结果。
为了避免TPM的问题,添加了sleep(1)和sleep(5)来控制节奏。
遇到的问题及解决方案
在开发过程中,发现几个典型的大模型集成痛点,尤其是Dify这种流式API的调用。
大模型给出的结果直接转换JSON有问题
大模型的输出有时不是纯JSON,比如裹在Markdown代码块里(````json ... ```),或者包含额外文本、未转义的双引号,甚至格式不标准。这导致json.loads()直接抛异常。
解决方案:
先用正则匹配提取纯JSON部分(re.search(r'```json\n([\s\S]*?)\n```'))。
如果还是失败,启用use_re_keep_json函数,通过四个独立的正则匹配每个字段的内容,然后自动组装字典。
额外加了preprocess_json_string来转义代码段的双引号,确保JSON有效。
这个方案让成功率大大提升,大语言模型的输出并不总是完美的必须有后处理来兜底。
模型思考过慢
Dify的工作流中,大模型有时会“思考”很久,尤其是输入HTML很长时。单进程处理时,整个脚本卡住;多进程下,如果不控制节奏,可能会导致API限流或服务器压力大。
解决方案:
引入多进程(multiprocessing),并行处理任务。
使用多个运营商缓解TPM问题,这样能极大增加效率。
在循环中添加sleep(1)(每个任务后)和sleep(5)(每10个任务后),给模型缓冲时间,也避免频繁请求被运营商限流。
流式输出的获取方法
Dify的API是流式(streaming),不是一次性返回全部响应。如果用普通response.text,会卡住或只拿到部分数据。
解决方案:
设置stream=True,然后用response.iter_lines()迭代读取每行。
去除前缀"data: ",尝试JSON解析,只关注"event": "workflow_finished"事件(这是Dify的结束标志)。
忽略中间的无效行,确保只提取最终输出。
完整代码
import json
import os
import re
import multiprocessing
from time import sleep
import requests
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
import time
# 这里是备用方案,如果大模型给出的结果直接转换json有问题,那么通过正则匹配先获取内容,存入字典中,再转换为json
def use_re_keep_json(target_string):
extract_part = re.search(r'"提取":\s*(.*?)\s*,\s*"标题"', target_string).group(1)
title_part = re.search(r'"标题":\s*(.*?)\s*,\s*"代码或指令示例"', target_string).group(1)
code_part = re.search(r'"代码或指令示例":\s*(.*?)\s*,\s*"详细解释"', target_string, re.DOTALL).group(1)
explanation_part = re.search(r'"详细解释":\s*(.*)\s*}', target_string, re.DOTALL).group(1)[:-1]
return json.loads(json.dumps({"提取":extract_part,"标题":title_part,"代码或指令示例":code_part,"详细解释":explanation_part}))
def preprocess_json_string(json_str):
# 匹配代码段中的双引号(排除JSON键值对的双引号)
pattern = r'(?<!\\)"(?=[^"]*"[^"]*(?:"[^"]*"[^"]*)*$)'
# 替换代码段中的双引号为转义形式
processed_str = re.sub(pattern, r'\"', json_str)
return processed_str
def read_json_array(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"读取JSON文件出错: {e}")
return []
# 获取下之前清理完成的路径保证不重复工作
def load_finish_urls():
try:
if os.path.exists('finish_url.json'):
with open('finish_url.json', 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"加载finish_url.json出错: {e}")
return []
# 核心代码访问dify里面制作好的大语言模型来完成数据清洗
def run_workflow_and_get_clean_output(targetInfo):
url = "http://localhost/v1/workflows/run"
payload = json.dumps({
"inputs": {
"targetHTML": str(targetInfo),
},
"response_mode": "streaming",
"user": "abc-123"
})
headers = {
'Authorization': 'Bearer dify的API',
'Content-Type': 'application/json'
}
try:
# dify用的是流式输出
response = requests.post(url, headers=headers, data=payload, stream=True)
response.raise_for_status() # 检查HTTP错误
final_output = None
for line in response.iter_lines():
if line:
# 解码并尝试解析JSON
decoded_line = line.decode('utf-8')
decoded_line = decoded_line.replace("data: ", '')
try:
event_data = json.loads(decoded_line)
if event_data.get("event") == "workflow_finished": # 通过该标志获取流式的结尾
final_output = event_data["data"]["outputs"]["text"]
break
except json.JSONDecodeError:
continue # 这里如果出问题不是大语言模型的输出问题,而是dify返回的格式出问题了,这里可以不管
if final_output is None:
return {"error": "未捕获到工作流结束事件"}
# 应对如果由think的情况但是目前用不上
cleaned_output = re.sub(r'^.*?</think>\s*\n?', '', final_output, flags=re.DOTALL)
# 如果正则匹配失败,则使用split方法
if cleaned_output == final_output:
parts = final_output.split("</think>", 1)
cleaned_output = parts[-1].lstrip() if len(parts) > 1 else final_output
# 返回解析后的JSON对象
try:
pattern = r'```json\n([\s\S]*?)\n```'# 有时候大模型返回的json会用markdown的方法标注起来需要清理
match = re.search(pattern, cleaned_output)
cleaned_output = match.group(1)
return json.loads(cleaned_output)
except:
return json.loads(cleaned_output)
except requests.exceptions.RequestException as e:
return {"error": f"请求失败: {str(e)}"}
except json.JSONDecodeError:
try:# 这里就是大模型返回的问题,如果不是标准json格式那么我们就触发正则匹配去获取json
aasw = use_re_keep_json(cleaned_output)
return aasw
except:
return {"error": "输出内容不是有效的JSON格式"}
def process_tasks(pid, tasks, finish_url):
# 使用selenium访问该文件,其实有点没必要,因为后面我看直接获取html的内容也可以,但是结果基本都一样的
chrome_options = Options()
chrome_options.add_argument("--headless=new")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
# 初始化WebDriver
driver = webdriver.Chrome(
service=Service(),
options=chrome_options
)
allInfo = []
processed = 0
try:
for meta in tasks:
html_path = os.path.abspath(meta['path'])
if html_path in finish_url:
print(f"进程 {pid} 跳过已处理的文件: {html_path}")
continue
try:
print(f"进程 {pid} 正在处理: {html_path}")
driver.get(f"file:///{html_path}")
page_source = driver.page_source
back_info = run_workflow_and_get_clean_output(page_source)
if "error" not in back_info:
back_info['sourceURL'] = html_path
allInfo.append(back_info)
finish_url.append(html_path)
processed += 1
sleep(1)
# 每处理10个任务保存一次进度
if processed % 10 == 0:
print(f"进程 {pid} 处理了 {processed} 个任务")
timestamp = int(time.time())
sleep(5)
# 保存当前进程的结果
with open(f'results_{pid}_{timestamp}.json', 'w', encoding='utf-8') as f:
json.dump(allInfo, f, ensure_ascii=False, indent=4)
# 保存当前进程的完成URL,如果是出问题的不会写在里面
with open(f'finish_url_{pid}.json', 'w', encoding='utf-8') as f:
json.dump(finish_url, f, ensure_ascii=False, indent=4)
else:
print(f"进程 {pid} 处理出错: {back_info['error']} - {html_path}")
except Exception as e:
print(f"进程 {pid} 处理文件时出错: {e} - {html_path}")
try:
driver.quit()
except:
pass
driver = webdriver.Chrome(
service=Service(),
options=chrome_options
)
# 保存最后一次进度,合并每个进程爬取的url
if allInfo:
timestamp = int(time.time())
with open(f'results_{pid}_{timestamp}.json', 'w', encoding='utf-8') as f:
json.dump(allInfo, f, ensure_ascii=False, indent=4)
with open(f'finish_url_{pid}.json', 'w', encoding='utf-8') as f:
json.dump(finish_url, f, ensure_ascii=False, indent=4)
finally:
driver.quit()
print(f"进程 {pid} 完成,共处理 {processed} 个任务")
def merge_results(num_processes):# 合并全部结果
all_results = []
finish_urls = set()
# 合并完成URL
for pid in range(num_processes):
file_name = f'finish_url_{pid}.json'
if os.path.exists(file_name):
try:
with open(file_name, 'r', encoding='utf-8') as f:
urls = json.load(f)
finish_urls.update(urls)
except Exception as e:
print(f"合并URL文件时出错: {e}")
# 保存合并后的finish_url
with open('finish_url.json', 'w', encoding='utf-8') as f:
json.dump(list(finish_urls), f, ensure_ascii=False, indent=4)
# 合并结果文件
result_files = [f for f in os.listdir() if f.startswith('results_') and f.endswith('.json')]
for file in result_files:
try:
with open(file, 'r', encoding='utf-8') as f:
data = json.load(f)
all_results.extend(data)
except Exception as e:
print(f"合并结果文件{file}时出错: {e}")
# 保存合并结果
if all_results:
timestamp = int(time.time())
with open(f'final_results_{timestamp}.json', 'w', encoding='utf-8') as f:
json.dump(all_results, f, ensure_ascii=False, indent=4)
print(f"合并完成,共处理 {len(finish_urls)} 个URL,生成 {len(all_results)} 条记录")
if __name__ == "__main__":
num_processes = 4
file_path = r"目标文件位置" # 注意原始路径格式
# 读取原始数据
original_data = read_json_array(file_path)
existing_finish_urls = load_finish_urls()
# 将清理过的url过一遍
tasks_to_process = [
meta for meta in original_data
if meta['type_id'] in [0, 3, 1, 6, 16, 10]
and os.path.abspath(meta['path']) not in existing_finish_urls
]
print(f"总任务数: {len(original_data)},需处理数: {len(tasks_to_process)}")
# 分配任务给不同进程
chunk_size = len(tasks_to_process) // num_processes
processes = []
for i in range(num_processes):
start = i * chunk_size
# 最后一个进程处理剩余的所有任务
end = (i + 1) * chunk_size if i < num_processes - 1 else len(tasks_to_process)
assigned_tasks = tasks_to_process[start:end]
# 每个进程使用自己的完成URL列表,这里就不弄共享列表了太麻烦了
process_finish_urls = existing_finish_urls.copy()
p = multiprocessing.Process(
target=process_tasks, # 传递函数引用
args=(i, assigned_tasks, process_finish_urls)
)
p.start()
processes.append(p)
for p in processes:
p.join()
merge_results(num_processes)