提交 20a702a1 authored 作者: 贺阳's avatar 贺阳

ai通过坐标处理,增加通过跳过ocr直接用ai处理的开关

上级 571454e3
......@@ -41,6 +41,12 @@ class BatchGetPodInfoWizard(models.TransientModel):
help='Whether to remove specified text from PDF files' # 是否涂抹PDF中的指定文字
)
skip_ocr_direct_ai = fields.Boolean(
string='Skip OCR Direct AI', # 跳过OCR直接使用AI
default=False,
help='Whether to skip OCR processing and directly use AI processing (for testing AI)' # 是否跳过OCR处理,直接使用AI处理(用于测试AI)
)
sync_match_node = fields.Boolean(
string='Sync Push Match Node', # 同步推送匹配节点
default=False,
......@@ -88,7 +94,7 @@ class BatchGetPodInfoWizard(models.TransientModel):
# 如果启用了涂抹文字,进行处理
if self.remove_specified_text and processed_files:
# processed_files = self._remove_specified_text(processed_files, debug_mode=False)
processed_files = self._remove_specified_text(processed_files, debug_mode=False)
# 合并PDF并保存到pdf_file字段
self._merge_pdf_files(processed_files)
......@@ -636,13 +642,16 @@ class BatchGetPodInfoWizard(models.TransientModel):
def _remove_specified_text(self, processed_files, debug_mode=False):
"""
移除PDF中的指定文字:先用OCR处理,检查是否还存在,如果存在则用AI处理,再次检查
移除PDF中的指定文字:
- 如果skip_ocr_direct_ai为True:直接使用AI处理,跳过OCR
- 如果skip_ocr_direct_ai为False:先用OCR处理,检查是否还存在,如果存在则用AI处理,再次检查
:param processed_files: 处理后的文件数组
:param debug_mode: 是否显示调试标记
:return: 处理后的文件数组(包含处理后的PDF数据)
"""
updated_files = []
error_messages = []
skip_ocr = self.skip_ocr_direct_ai # 是否跳过OCR直接使用AI
for file_info in processed_files:
if not file_info['bl']:
......@@ -656,10 +665,43 @@ class BatchGetPodInfoWizard(models.TransientModel):
# 将base64数据转换为二进制
pdf_binary = base64.b64decode(file_data)
# 先提取文本用于后续同步节点功能
# 先提取文本用于后续同步节点功能(如果需要的话)
if 'ocr_texts' not in file_info:
file_info['ocr_texts'] = self._extract_text_from_pdf_with_ocr(pdf_binary, bl.bl_no)
# 如果跳过OCR,直接使用AI处理
if skip_ocr:
_logger.info(f"提单 {bl.bl_no} 跳过OCR,直接使用AI处理")
try:
ai_processed_pdf = self._process_pdf_with_ai_image_edit(
pdf_data=pdf_binary,
bl_no=bl.bl_no
)
if ai_processed_pdf:
processed_file_data = base64.b64encode(ai_processed_pdf).decode('utf-8')
# 检查是否还存在目标文字
final_check_pdf = base64.b64decode(processed_file_data)
text_still_exists, final_found_texts = self._check_target_texts_exist(final_check_pdf, bl.bl_no)
if text_still_exists:
error_msg = f"提单 {bl.bl_no} 经过AI处理后仍存在目标文字: {', '.join(final_found_texts)},请取消该提单操作,手动处理"
_logger.error(error_msg)
error_messages.append(error_msg)
# 不更新文件数据,保持原始状态
processed_file_data = file_data
else:
_logger.info(f"提单 {bl.bl_no} AI处理成功,目标文字已清除")
else:
error_msg = f"提单 {bl.bl_no} AI处理失败"
_logger.error(error_msg)
error_messages.append(error_msg)
except Exception as e:
_logger.error(f"提单 {bl.bl_no} AI处理异常: {str(e)}")
error_msg = f"提单 {bl.bl_no} AI处理异常: {str(e)}"
error_messages.append(error_msg)
else:
# 原有逻辑:先用OCR处理,如果还存在则用AI处理
# 第一步:使用OCR方法处理PDF
_logger.info(f"提单 {bl.bl_no} 开始OCR处理")
try:
......@@ -821,6 +863,7 @@ class BatchGetPodInfoWizard(models.TransientModel):
pdf_document = fitz.open(stream=pdf_data, filetype="pdf")
processed_pages = []
total_pages = len(pdf_document)
total_ai_time = 0.0 # 累计AI总耗时
# 遍历每一页
for page_num in range(total_pages):
......@@ -844,6 +887,7 @@ class BatchGetPodInfoWizard(models.TransientModel):
)
ai_end_time = time.time()
ai_processing_time = ai_end_time - ai_start_time
total_ai_time += ai_processing_time # 累计AI耗时
if edited_img_base64:
# 解码base64图片数据
......@@ -902,8 +946,9 @@ class BatchGetPodInfoWizard(models.TransientModel):
_logger.info(f"AI图片编辑PDF处理完成,提单号: {bl_no}")
_logger.info(f"总处理时间: {total_time:.2f}秒")
_logger.info(f"AI总耗时: {total_ai_time:.2f}秒(累计所有页面的AI处理时间)")
_logger.info(f"PDF创建时间: {pdf_creation_time:.2f}秒")
_logger.info(f"平均每页AI处理时间: {total_time/total_pages:.2f}秒")
_logger.info(f"平均每页AI处理时间: {total_ai_time/total_pages:.2f}秒" if total_pages > 0 else "平均每页AI处理时间: 0.00秒")
return result_data
......
......@@ -13,6 +13,8 @@
<group>
<field name="remove_specified_text" readonly="1" widget="boolean_toggle"
attrs="{'invisible': [('pdf_file', '!=', False)]}"/>
<field name="skip_ocr_direct_ai" readonly="0" widget="boolean_toggle"
attrs="{'invisible': [('pdf_file', '!=', False)]}"/>
</group>
<group>
<field name="sync_last_mile_pod" widget="boolean_toggle"
......
......@@ -6,7 +6,8 @@ import base64
import fitz # PyMuPDF
import json
from PIL import Image, ImageDraw
import time
begin_time = time.time()
client = OpenAI(
# 若没有配置环境变量,请用阿里云百炼API Key将下行替换为:api_key="sk-xxx",
# 新加坡和北京地域的API Key不同。获取API Key:https://help.aliyun.com/zh/model-studio/get-api-key
......@@ -15,7 +16,7 @@ client = OpenAI(
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
pdf_path = "C:/Users/Administrator/Desktop/43610281036.pdf"
pdf_path = "./43610272216.pdf"
def pdf_to_images(pdf_path, output_dir='./pdf_pages', dpi=150):
"""
......@@ -69,10 +70,16 @@ def safe_extract_json(text: str):
"""从模型返回文本中尽可能鲁棒地提取JSON对象。"""
# 直接尝试解析
try:
return json.loads(text)
if text.startswith("```json"):
text = text[7:-3].strip()
obj = json.loads(text)
if isinstance(obj, list):
return {'rects': obj}
return obj
except Exception:
pass
print(text)
# 尝试提取首尾花括号之间的内容
start = text.find('{')
end = text.rfind('}')
......@@ -169,6 +176,88 @@ def draw_debug_boxes(image_path: str, coords_map: dict, save_path: str):
img.save(save_path)
print(f"调试框已生成: {save_path}")
def convert_ai_json_to_coords_map(result, img_w: int, img_h: int) -> dict:
"""
将AI返回的JSON统一转换为 {label: [x1,y1,x2,y2]} 形式,兼容多种结构:
1) {"rects":[{"text":"AGN","bbox_norm":{x1,y1,x2,y2},"bbox_px":{x1,y1,x2,y2}}]}
2) {"AGN":[x1,y1,x2,y2], "UCLINK":[...], ...}
3) {"rects":[{"label":"AGN","bbox":[x1,y1,x2,y2]}]}
4) {"rects":[{"text":"AGN","x1":...,"y1":...,"x2":...,"y2":...}]}
返回值可以包含像素或归一化坐标,后续由 normalize_bbox 统一处理。
"""
coords_map: dict = {}
def dict_to_list(b):
if isinstance(b, dict):
return [b.get("x1"), b.get("y1"), b.get("x2"), b.get("y2")]
return b
try:
# 情形A:顶层是dict
if isinstance(result, dict):
# A1:包含 rects 列表
if "rects" in result and isinstance(result["rects"], list):
for i, item in enumerate(result["rects"]):
if not isinstance(item, dict):
continue
label = item.get("text") or item.get("label") or item.get("word") or f"rect_{i}"
idx = item.get("occurrence_index")
key = f"{label}#{idx}" if isinstance(idx, int) and idx > 0 else label
bbox_px = dict_to_list(item.get("bbox_px") or item.get("bbox_pixels"))
bbox_norm = dict_to_list(item.get("bbox_norm"))
bbox_generic = dict_to_list(item.get("bbox"))
chosen = None
# 如果同时存在像素和归一化,做一致性校验
if isinstance(bbox_px, (list, tuple)) and len(bbox_px) == 4 and isinstance(bbox_norm, (list, tuple)) and len(bbox_norm) == 4:
try:
px_from_norm = [int(round(float(bbox_norm[0]) * img_w)),
int(round(float(bbox_norm[1]) * img_h)),
int(round(float(bbox_norm[2]) * img_w)),
int(round(float(bbox_norm[3]) * img_h))]
diff = sum(abs(px_from_norm[j] - int(round(float(bbox_px[j])))) for j in range(4))
chosen = bbox_px if diff <= 4 else bbox_norm
except Exception:
chosen = bbox_px
elif isinstance(bbox_px, (list, tuple)) and len(bbox_px) == 4:
chosen = bbox_px
elif isinstance(bbox_norm, (list, tuple)) and len(bbox_norm) == 4:
chosen = bbox_norm
elif isinstance(bbox_generic, (list, tuple)) and len(bbox_generic) == 4:
chosen = bbox_generic
else:
# 直接字段 x1,y1,x2,y2
if all(k in item for k in ("x1", "y1", "x2", "y2")):
chosen = [item.get("x1"), item.get("y1"), item.get("x2"), item.get("y2")]
if isinstance(chosen, (list, tuple)) and len(chosen) == 4:
coords_map[key] = list(chosen)
else:
print(f"跳过无法解析的rect: {item}")
else:
# A2:简单键值对形式
for k, v in result.items():
if isinstance(v, (list, tuple)) and len(v) == 4:
coords_map[k] = list(v)
# 情形B:顶层是list
elif isinstance(result, list):
for i, item in enumerate(result):
if not isinstance(item, dict):
continue
label = item.get("text") or item.get("label") or item.get("word") or f"rect_{i}"
bbox = item.get("bbox_px") or item.get("bbox_norm") or item.get("bbox")
bbox = dict_to_list(bbox)
if isinstance(bbox, (list, tuple)) and len(bbox) == 4:
coords_map[label] = list(bbox)
else:
print("AI返回的JSON结构未知,无法解析。")
except Exception as e:
print(f"解析AI JSON时发生错误: {e}")
return coords_map
def images_to_pdf(image_paths, output_pdf):
os.makedirs(os.path.dirname(output_pdf), exist_ok=True)
pil_images = [Image.open(p).convert('RGB') for p in image_paths]
......@@ -179,6 +268,22 @@ def images_to_pdf(image_paths, output_pdf):
first.save(output_pdf, save_all=True, append_images=rest)
print(f"已生成PDF: {output_pdf}")
text = f"""(仅归一化坐标,严格 JSON)
你是一名版面定位助手。请在下图中定位并分别框出以下四个单词:AGN、UCLINK、LOGISITICS、LTD。
坐标系与输出要求:
- 图像尺寸:宽 {img_w} 像素,高 {img_h} 像素。
- 原点位于图像左上角;x 向右增大,y 向下增大。
- 为每个目标词返回它的最小外接矩形框,边界紧贴字形,不要添加额外边距。
- 返回坐标为相对宽高的归一化浮点数,范围 [0,1],保留 4 位小数;保证 0 ≤ x1 < x2 ≤ 1,0 ≤ y1 < y2 ≤ 1。
- 禁止任何图片预处理(裁剪、缩放、加边距、重采样);坐标必须对应原始图像。
- 严格只输出下面的压缩的 JSON,不要附加解释或其他文本。
- JSON中不要出现不在实例中的参数,例如bbox_2d
输出 JSON 格式(示例为格式演示,实际数值请识别后填充):"""
text += '[{"text":"AGN","bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}},{"text":"UCLINK","bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}},{"text":"LOGISITICS","bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}},{"text":"LTD","bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}}]'
completion = client.chat.completions.create(
model="qwen3-vl-plus", # 此处以qwen3-vl-plus为例,可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/models
messages=[
......@@ -191,13 +296,15 @@ completion = client.chat.completions.create(
"url": image_base64
},
},
{"type": "text", "text": f"图像分辨率为{img_w}x{img_h}像素。坐标系定义:以原始图像左上角为原点(0,0),x向右增加,y向下增加;不要使用任何预处理(缩放或加黑边)产生的坐标。请仅返回这两个文本的矩形框坐标,且必须是归一化到[0,1]的浮点数(相对于原始图像宽高),返回格式严格为压缩JSON、无任何解释:{{\"AGN\": [x1_rel, y1_rel, x2_rel, y2_rel], \"UCLINK LOGISITICS LTD\": [x3_rel, y3_rel, x4_rel, y4_rel]}}。"},
{"type": "text", "text": text},
],
},
],
temperature=0.1,
)
raw_text = completion.choices[0].message.content
# raw_text = '```json[{"bbox_norm": {"x1": 0.1028, "y1": 0.1934, "x2": 0.1325, "y2": 0.2006}, "text": "AGN", "occurrence_index": 0},{"bbox_norm": {"x1": 0.1028, "y1": 0.2057, "x2": 0.1608, "y2": 0.2165}, "text": "UCLINK", "occurrence_index": 0},{"bbox_norm": {"x1": 0.1677, "y1": 0.2057, "x2": 0.2657, "y2": 0.2165}, "text": "LOGISITICS", "occurrence_index": 0},{"bbox_norm": {"x1": 0.2726, "y1": 0.2057, "x2": 0.3023, "y2": 0.2165}, "text": "LTD", "occurrence_index": 0}]```'
print(raw_text)
result = safe_extract_json(raw_text)
if result is None or not isinstance(result, dict):
......@@ -207,10 +314,16 @@ if result is None or not isinstance(result, dict):
cleaned_dir = os.path.join("./output")
cleaned_first = os.path.join(cleaned_dir, "cleaned_page_1.png")
debug_first = os.path.join(cleaned_dir, "debug_page_1.png")
draw_debug_boxes(image_paths[0], result, debug_first)
erase_regions_on_image(image_paths[0], result, cleaned_first)
coords_map = convert_ai_json_to_coords_map(result, img_w, img_h)
if not coords_map:
raise RuntimeError("无法从AI返回中提取矩形框坐标,请检查输出格式或提示词。")
print(f"解析并统一后的坐标字典: {coords_map}")
draw_debug_boxes(image_paths[0], coords_map, debug_first)
erase_regions_on_image(image_paths[0], coords_map, cleaned_first)
# 合成PDF:第一页使用清理后的图片,其余页沿用原图
final_images = [cleaned_first] + image_paths[1:]
images_to_pdf(final_images, os.path.join(cleaned_dir, "cleaned.pdf"))
end_time = time.time()
print(f"耗时: {end_time - begin_time} 秒")
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论