提交 571454e3 authored 作者: 贺阳's avatar 贺阳

同步的检查

上级 40a9aa79
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
<field name="file" string="PDF File" required="1" filename="attachment_name"/> <field name="file" string="PDF File" required="1" filename="attachment_name"/>
<field name="is_upload" readonly="1"/> <field name="is_upload" readonly="1"/>
<field name="upload_time"/> <field name="upload_time"/>
<field name="create_date" optional="show"/>
<button name="action_sync" string="SyncTo.." type="object" icon="fa-upload"/> <button name="action_sync" string="SyncTo.." type="object" icon="fa-upload"/>
<field name="attachment_name" invisible="1"/> <field name="attachment_name" invisible="1"/>
</tree> </tree>
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import base64 import base64
import requests import requests
from dashscope import ImageSynthesis import json
import dashscope
import logging import logging
from http import HTTPStatus from http import HTTPStatus
from openai import OpenAI
from PIL import Image, ImageDraw
import io
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
# 设置DashScope的API地址
dashscope.base_http_api_url = 'https://dashscope.aliyuncs.com/api/v1'
class AIImageEditService: class AIImageEditService:
"""AI图片编辑服务 - 使用阿里云百炼的wan2.5-i2i-preview模型""" """AI图片编辑服务 - 使用坐标检测方式移除文字"""
def __init__(self, api_key='sk-e41914f0d9c94035a5ae1322e9a61fb1'): def __init__(self, api_key='sk-e41914f0d9c94035a5ae1322e9a61fb1'):
self.api_key = api_key self.api_key = api_key
self.model = "wan2.5-i2i-preview" # 使用OpenAI客户端连接阿里云百炼
# 设置DashScope的API密钥 self.client = OpenAI(
dashscope.api_key = self.api_key api_key=api_key,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
def edit_image_remove_text(self, image_base64, text_to_remove="AGN UCLINK LOGISITICS LTD"): def edit_image_remove_text(self, image_base64, text_to_remove="AGN UCLINK LOGISITICS LTD"):
""" """
使用AI模型移除图片中的指定文字 使用坐标检测方式移除图片中的指定文字
:param image_base64: 图片的base64编码 :param image_base64: 图片的base64编码
:param text_to_remove: 要移除的文字 :param text_to_remove: 要移除的文字
:return: 处理后的图片base64编码,失败返回None :return: 处理后的图片base64编码,失败返回None
...@@ -31,84 +32,67 @@ class AIImageEditService: ...@@ -31,84 +32,67 @@ class AIImageEditService:
import time import time
start_time = time.time() start_time = time.time()
_logger.info(f"开始AI图片编辑,目标文字: {text_to_remove}") _logger.info(f"开始AI坐标检测,目标文字: {text_to_remove}")
try: try:
# 根据要移除的文字构建不同的提示词,强调保持清晰度 # 解码图片获取尺寸
image_data = base64.b64decode(image_base64)
image = Image.open(io.BytesIO(image_data))
img_w, img_h = image.size
_logger.info(f"图片尺寸: {img_w}x{img_h} 像素")
# 构建坐标检测提示词
if "AGN" in text_to_remove and "UCLINK" in text_to_remove: if "AGN" in text_to_remove and "UCLINK" in text_to_remove:
prompt = "Remove the text 'AGN' and 'UCLINK LOGISITICS LTD' from the image completely, maintain perfect background consistency, preserve maximum image clarity and sharpness, high resolution, crisp details, professional quality, no blur, no pixelation, maintain original image quality" prompt_text = f"图像分辨率为{img_w}x{img_h}像素。坐标系定义:以原始图像左上角为原点(0,0),x向右增加,y向下增加;不要使用任何预处理(缩放或加黑边)产生的坐标。请仅返回这两个文本的矩形框坐标,且必须是归一化到[0,1]的浮点数(相对于原始图像宽高),返回格式严格为压缩JSON、无任何解释:{{\"AGN\": [x1_rel, y1_rel, x2_rel, y2_rel], \"UCLINK LOGISITICS LTD\": [x3_rel, y3_rel, x4_rel, y4_rel]}}。"
elif "AGN" in text_to_remove: elif "AGN" in text_to_remove:
prompt = "Remove the text 'AGN' from the image completely, maintain perfect background consistency, preserve maximum image clarity and sharpness, high resolution, crisp details, professional quality, no blur, no pixelation, maintain original image quality" prompt_text = f"图像分辨率为{img_w}x{img_h}像素。坐标系定义:以原始图像左上角为原点(0,0),x向右增加,y向下增加;请仅返回'AGN'文本的矩形框坐标,且必须是归一化到[0,1]的浮点数(相对于原始图像宽高),返回格式严格为压缩JSON、无任何解释:{{\"AGN\": [x1_rel, y1_rel, x2_rel, y2_rel]}}。"
elif "UCLINK" in text_to_remove: elif "UCLINK" in text_to_remove:
prompt = "Remove the text 'UCLINK LOGISITICS LTD' from the image completely, maintain perfect background consistency, preserve maximum image clarity and sharpness, high resolution, crisp details, professional quality, no blur, no pixelation, maintain original image quality" prompt_text = f"图像分辨率为{img_w}x{img_h}像素。坐标系定义:以原始图像左上角为原点(0,0),x向右增加,y向下增加;请仅返回'UCLINK LOGISITICS LTD'文本的矩形框坐标,且必须是归一化到[0,1]的浮点数(相对于原始图像宽高),返回格式严格为压缩JSON、无任何解释:{{\"UCLINK LOGISITICS LTD\": [x1_rel, y1_rel, x2_rel, y2_rel]}}。"
else: else:
prompt = f"Remove the text '{text_to_remove}' from the image completely, maintain perfect background consistency, preserve maximum image clarity and sharpness, high resolution, crisp details, professional quality, no blur, no pixelation, maintain original image quality" prompt_text = f"图像分辨率为{img_w}x{img_h}像素。坐标系定义:以原始图像左上角为原点(0,0),x向右增加,y向下增加;请仅返回'{text_to_remove}'文本的矩形框坐标,且必须是归一化到[0,1]的浮点数(相对于原始图像宽高),返回格式严格为压缩JSON、无任何解释:{{\"{text_to_remove}\": [x1_rel, y1_rel, x2_rel, y2_rel]}}。"
# 调用ImageSynthesis模型,使用更高质量的设置 # 调用坐标检测API
api_start_time = time.time() api_start_time = time.time()
rsp = ImageSynthesis.call( completion = self.client.chat.completions.create(
api_key=self.api_key, # 显式传递API密钥 model="qwen3-vl-plus",
model=self.model, messages=[
prompt=prompt, {
images=[f"data:image/png;base64,{image_base64}"], "role": "user",
negative_prompt="blurry, unclear, low quality, text, letters, numbers, noise, distortion, pixelated, compressed, low resolution, fuzzy, grainy, soft, out of focus, low detail, poor quality, artifacts, compression artifacts, jpeg artifacts, lossy compression", "content": [
n=1, {
watermark=False, "type": "image_url",
seed=12345, "image_url": {
# 移除size参数,让AI使用原始图片尺寸 "url": f"data:image/png;base64,{image_base64}"
# size="2048*2048", # 这个参数格式不正确,导致API错误 },
quality="hd", # 使用高清质量 },
# 添加更多质量参数 {"type": "text", "text": prompt_text},
style="photographic", # 使用摄影风格保持清晰度 ],
enhance_prompt=True, # 增强提示词效果 },
],
) )
api_end_time = time.time() api_end_time = time.time()
api_processing_time = api_end_time - api_start_time api_processing_time = api_end_time - api_start_time
_logger.info(f"AI API调用耗时: {api_processing_time:.2f}秒") _logger.info(f"AI坐标检测耗时: {api_processing_time:.2f}秒")
if rsp.status_code == HTTPStatus.OK: # 解析返回的坐标
# 检查返回结果结构 raw_text = completion.choices[0].message.content
_logger.info(f"API响应结构: {rsp}") _logger.info(f"AI返回的坐标信息: {raw_text}")
# 检查任务状态 coords_map = self._safe_extract_json(raw_text)
if hasattr(rsp, 'output') and hasattr(rsp.output, 'task_status'): if coords_map is None or not isinstance(coords_map, dict):
task_status = rsp.output.task_status _logger.error("AI返回内容无法解析为JSON坐标")
_logger.info(f"AI任务状态: {task_status}")
if task_status == "FAILED":
error_code = getattr(rsp.output, 'code', 'Unknown')
error_message = getattr(rsp.output, 'message', 'Unknown error')
_logger.error(f"AI任务失败 - 错误码: {error_code}, 错误信息: {error_message}")
return None
elif task_status != "SUCCEEDED":
_logger.warning(f"AI任务状态异常: {task_status}")
return None return None
# 安全地获取处理后图片的URL # 使用坐标信息涂抹文字
try: edited_image_base64 = self._erase_regions_on_image(image, coords_map, img_w, img_h)
if hasattr(rsp, 'output') and hasattr(rsp.output, 'results') and len(rsp.output.results) > 0:
image_url = rsp.output.results[0].url
_logger.info(f"AI图片编辑成功,图片URL: {image_url}")
# 下载图片并转换为base64
edited_image_base64 = self.download_and_convert_to_base64(image_url)
total_time = time.time() - start_time total_time = time.time() - start_time
_logger.info(f"AI图片编辑总耗时: {total_time:.2f}秒") _logger.info(f"AI坐标检测和涂抹总耗时: {total_time:.2f}秒")
return edited_image_base64 return edited_image_base64
else:
_logger.error(f"API返回结果结构异常: {rsp}")
return None
except (IndexError, AttributeError) as e:
_logger.error(f"解析API返回结果失败: {str(e)}")
_logger.error(f"完整响应: {rsp}")
return None
else:
_logger.error(f"AI图片编辑失败,HTTP返回码:{rsp.status_code}")
_logger.error(f"错误码:{rsp.code}")
_logger.error(f"错误信息:{rsp.message}")
return None
except Exception as e: except Exception as e:
_logger.error(f"AI图片编辑异常: {str(e)}") _logger.error(f"AI坐标检测异常: {str(e)}")
return None return None
def download_and_convert_to_base64(self, image_url, timeout=300): def download_and_convert_to_base64(self, image_url, timeout=300):
...@@ -132,3 +116,103 @@ class AIImageEditService: ...@@ -132,3 +116,103 @@ class AIImageEditService:
_logger.error(f"图片下载失败: {str(e)}") _logger.error(f"图片下载失败: {str(e)}")
return None return None
def _safe_extract_json(self, text: str):
"""从模型返回文本中尽可能鲁棒地提取JSON对象。"""
# 直接尝试解析
try:
return json.loads(text)
except Exception:
pass
# 尝试提取首尾花括号之间的内容
start = text.find('{')
end = text.rfind('}')
if start != -1 and end != -1 and end > start:
candidate = text[start:end+1]
try:
return json.loads(candidate)
except Exception:
# 将单引号替换为双引号再试一次(有些模型会返回单引号JSON样式)
candidate2 = candidate.replace("'", '"')
try:
return json.loads(candidate2)
except Exception:
return None
return None
def _normalize_bbox(self, bbox, img_w, img_h):
"""
规范化bbox为像素坐标 [x1,y1,x2,y2],并保证边界有效。
支持:
- 像素坐标;
- 归一化坐标(0-1);
- [x1,y1,w,h] 形式(若x2<=x1或y2<=y1则按宽高处理)。
"""
if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
return None
x1, y1, x2, y2 = bbox
# 如果可能是归一化坐标
if 0 <= x1 <= 1 and 0 <= y1 <= 1 and 0 <= x2 <= 1 and 0 <= y2 <= 1:
x1 = int(round(x1 * img_w))
y1 = int(round(y1 * img_h))
x2 = int(round(x2 * img_w))
y2 = int(round(y2 * img_h))
else:
# 像素坐标或 [x1,y1,w,h]
x1 = int(round(x1))
y1 = int(round(y1))
x2 = int(round(x2))
y2 = int(round(y2))
# 如果x2<=x1或y2<=y1,尝试将其视作宽高
if x2 <= x1 or y2 <= y1:
x2 = x1 + max(1, x2)
y2 = y1 + max(1, y2)
# 修正边界并加上少量边距
if x1 > x2:
x1, x2 = x2, x1
if y1 > y2:
y1, y2 = y2, y1
# 边距按短边的2%(最少5px,最多30px)
short_side = min(img_w, img_h)
margin = max(5, min(30, int(0.02 * short_side)))
x1 = max(0, x1 - margin)
y1 = max(0, y1 - margin)
x2 = min(img_w - 1, x2 + margin)
y2 = min(img_h - 1, y2 + margin)
return [x1, y1, x2, y2]
def _erase_regions_on_image(self, image, coords_map, img_w, img_h):
"""
在图片上涂抹指定区域
"""
try:
# 创建图片副本
img = image.convert('RGB')
draw = ImageDraw.Draw(img)
for key, bbox in coords_map.items():
nb = self._normalize_bbox(bbox, img_w, img_h)
if nb is None:
_logger.warning(f"坐标格式错误,跳过 {key}: {bbox}")
continue
x1, y1, x2, y2 = nb
# 用白色矩形覆盖
draw.rectangle([x1, y1, x2, y2], fill=(255, 255, 255))
_logger.info(f"已抹除 {key} 区域: {nb}")
# 将处理后的图片转换为base64
output = io.BytesIO()
img.save(output, format='PNG')
edited_image_base64 = base64.b64encode(output.getvalue()).decode('utf-8')
_logger.info("清理后的图片已处理完成")
return edited_image_base64
except Exception as e:
_logger.error(f"涂抹区域失败: {str(e)}")
return None
...@@ -19,82 +19,6 @@ class BatchGetPodInfoWizard(models.TransientModel): ...@@ -19,82 +19,6 @@ class BatchGetPodInfoWizard(models.TransientModel):
_name = 'batch.get.pod.info.wizard' _name = 'batch.get.pod.info.wizard'
_description = 'Batch Get POD Info Wizard' # 批量获取POD信息向导 _description = 'Batch Get POD Info Wizard' # 批量获取POD信息向导
@api.model
def cron_cleanup_temp_attachments(self):
"""
定时清理向导生成的临时附件
每天早上8点执行,删除1天之前创建的temp_pod_开头的附件
"""
try:
# 计算1天前的时间(前一天23:59:59)
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
one_day_ago = today - timedelta(seconds=1) # 前一天23:59:59
_logger.info(f"开始执行定时清理临时附件任务,清理时间点: {one_day_ago.strftime('%Y-%m-%d %H:%M:%S')}")
# 使用SQL查询查找1天之前创建的临时附件
_logger.info("使用SQL查询查找临时附件")
# 构建SQL查询
sql_query = """
SELECT id, name, res_model, res_id, create_date, store_fname
FROM ir_attachment
WHERE res_model = 'batch.get.pod.info.wizard'
AND create_date < '%s'
ORDER BY create_date DESC
""" % (one_day_ago.strftime('%Y-%m-%d %H:%M:%S'))
# 执行SQL查询
self.env.cr.execute(sql_query)
sql_results = self.env.cr.fetchall()
# 将SQL结果转换为Odoo记录集
if sql_results:
attachment_ids = [result[0] for result in sql_results]
temp_attachments = self.env['ir.attachment'].sudo().browse(attachment_ids)
attachment_count = len(temp_attachments)
attachment_names = [att.name for att in temp_attachments]
_logger.info(f"找到 {attachment_count} 个{one_day_ago.strftime('%Y-%m-%d')}之前创建的临时附件,开始清理")
# 删除物理文件
for attachment in temp_attachments:
try:
# 获取附件的物理文件路径
if hasattr(attachment, 'store_fname') and attachment.store_fname:
# Odoo 12+ 使用 store_fname
file_path = attachment.store_fname
elif hasattr(attachment, 'datas_fname') and attachment.datas_fname:
# 旧版本使用 datas_fname
file_path = attachment.datas_fname
else:
# 尝试从 name 字段构建路径
file_path = attachment.name
# 构建完整的文件路径
import os
from odoo.tools import config
# 获取 Odoo 数据目录
data_dir = config.filestore(self.env.cr.dbname)
if data_dir and file_path:
full_path = os.path.join(data_dir, file_path)
if os.path.exists(full_path):
os.remove(full_path)
_logger.info(f"已删除物理文件: {full_path}")
else:
_logger.warning(f"物理文件不存在: {full_path}")
except Exception as file_e:
_logger.warning(f"删除物理文件失败 {attachment.name}: {str(file_e)}")
# 删除数据库记录
temp_attachments.unlink()
_logger.info(f"定时清理完成,共删除 {attachment_count} 个{one_day_ago.strftime('%Y-%m-%d')}之前创建的临时附件: {', '.join(attachment_names[:5])}{'...' if len(attachment_names) > 5 else ''}")
else:
_logger.info(f"没有找到{one_day_ago.strftime('%Y-%m-%d')}之前创建的临时附件需要清理")
except Exception as e:
_logger.error(f"定时清理临时附件失败: {str(e)}")
def get_order(self): def get_order(self):
""" """
得到单据 得到单据
...@@ -164,7 +88,7 @@ class BatchGetPodInfoWizard(models.TransientModel): ...@@ -164,7 +88,7 @@ class BatchGetPodInfoWizard(models.TransientModel):
# 如果启用了涂抹文字,进行处理 # 如果启用了涂抹文字,进行处理
if self.remove_specified_text and processed_files: if self.remove_specified_text and processed_files:
processed_files = self._remove_specified_text(processed_files, debug_mode=False) # processed_files = self._remove_specified_text(processed_files, debug_mode=False)
# 合并PDF并保存到pdf_file字段 # 合并PDF并保存到pdf_file字段
self._merge_pdf_files(processed_files) self._merge_pdf_files(processed_files)
...@@ -290,10 +214,16 @@ class BatchGetPodInfoWizard(models.TransientModel): ...@@ -290,10 +214,16 @@ class BatchGetPodInfoWizard(models.TransientModel):
# 同步推送匹配节点 # 同步推送匹配节点
if self.sync_match_node and processed_files: if self.sync_match_node and processed_files:
logging.info(f"同步推送匹配节点,共 {len(processed_files)} 个文件") logging.info(f"同步推送匹配节点,共 {len(processed_files)} 个文件")
self.get_date_sync_match_node(processed_files) #且需先对比小包当前节点的操作时间是否小于提取时间(同时区对比)若大于则不能推送,
# 若需补推节点,则需判断提取时间-写入节点(不取写入第一个节点)的前序间隔时间是否大于小包当前节点的操作时间。
# 若不满足以上条件,则不执行生成和自动推送节点,并在小包上新增推送备注(新增该字段)回写备注信息:获取尾程POD,自动推送节点失败,有风险产生倒挂。请手动操作205-10-20 10:20:20(获取时间)
valid_files = self._validate_node_push_conditions(processed_files)
if valid_files:
self.get_date_sync_match_node(valid_files)
else:
_logger.info(f"没有满足条件的文件,不执行生成和自动推送节点")
# 清理所有临时文件(包括数据库记录和物理文件) # 清理所有临时文件(包括数据库记录和物理文件)
self._cleanup_all_temp_files(bl_objs) self._cleanup_all_temp_files(bl_objs)
end_time = time.time() end_time = time.time()
...@@ -309,6 +239,15 @@ class BatchGetPodInfoWizard(models.TransientModel): ...@@ -309,6 +239,15 @@ class BatchGetPodInfoWizard(models.TransientModel):
'context': {'default_show_error_message': self.show_error_message,'active_id': bl_objs.ids} 'context': {'default_show_error_message': self.show_error_message,'active_id': bl_objs.ids}
} }
def _validate_node_push_conditions(self, processed_files):
"""
验证节点推送条件
:param processed_files: 处理后的文件数组
:return: 满足条件的文件数组
"""
return processed_files
# 写一个方法调接口获取提单pdf文件 # 写一个方法调接口获取提单pdf文件
def _get_pdf_file_arr(self): def _get_pdf_file_arr(self):
""" """
...@@ -2033,3 +1972,79 @@ class BatchGetPodInfoWizard(models.TransientModel): ...@@ -2033,3 +1972,79 @@ class BatchGetPodInfoWizard(models.TransientModel):
_logger.error(f"反序列化processed_files失败: {str(e)}") _logger.error(f"反序列化processed_files失败: {str(e)}")
return [] return []
@api.model
def cron_cleanup_temp_attachments(self):
"""
定时清理向导生成的临时附件
每天早上8点执行,删除1天之前创建的temp_pod_开头的附件
"""
try:
# 计算1天前的时间(前一天23:59:59)
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
one_day_ago = today - timedelta(seconds=1) # 前一天23:59:59
_logger.info(f"开始执行定时清理临时附件任务,清理时间点: {one_day_ago.strftime('%Y-%m-%d %H:%M:%S')}")
# 使用SQL查询查找1天之前创建的临时附件
_logger.info("使用SQL查询查找临时附件")
# 构建SQL查询
sql_query = """
SELECT id, name, res_model, res_id, create_date, store_fname
FROM ir_attachment
WHERE res_model = 'batch.get.pod.info.wizard'
AND create_date < '%s'
ORDER BY create_date DESC
""" % (one_day_ago.strftime('%Y-%m-%d %H:%M:%S'))
# 执行SQL查询
self.env.cr.execute(sql_query)
sql_results = self.env.cr.fetchall()
# 将SQL结果转换为Odoo记录集
if sql_results:
attachment_ids = [result[0] for result in sql_results]
temp_attachments = self.env['ir.attachment'].sudo().browse(attachment_ids)
attachment_count = len(temp_attachments)
attachment_names = [att.name for att in temp_attachments]
_logger.info(f"找到 {attachment_count} 个{one_day_ago.strftime('%Y-%m-%d')}之前创建的临时附件,开始清理")
# 删除物理文件
for attachment in temp_attachments:
try:
# 获取附件的物理文件路径
if hasattr(attachment, 'store_fname') and attachment.store_fname:
# Odoo 12+ 使用 store_fname
file_path = attachment.store_fname
elif hasattr(attachment, 'datas_fname') and attachment.datas_fname:
# 旧版本使用 datas_fname
file_path = attachment.datas_fname
else:
# 尝试从 name 字段构建路径
file_path = attachment.name
# 构建完整的文件路径
import os
from odoo.tools import config
# 获取 Odoo 数据目录
data_dir = config.filestore(self.env.cr.dbname)
if data_dir and file_path:
full_path = os.path.join(data_dir, file_path)
if os.path.exists(full_path):
os.remove(full_path)
_logger.info(f"已删除物理文件: {full_path}")
else:
_logger.warning(f"物理文件不存在: {full_path}")
except Exception as file_e:
_logger.warning(f"删除物理文件失败 {attachment.name}: {str(file_e)}")
# 删除数据库记录
temp_attachments.unlink()
_logger.info(f"定时清理完成,共删除 {attachment_count} 个{one_day_ago.strftime('%Y-%m-%d')}之前创建的临时附件: {', '.join(attachment_names[:5])}{'...' if len(attachment_names) > 5 else ''}")
else:
_logger.info(f"没有找到{one_day_ago.strftime('%Y-%m-%d')}之前创建的临时附件需要清理")
except Exception as e:
_logger.error(f"定时清理临时附件失败: {str(e)}")
import os
from openai import OpenAI
import requests
import mimetypes
import base64
import fitz # PyMuPDF
import json
from PIL import Image, ImageDraw
client = OpenAI(
# 若没有配置环境变量,请用阿里云百炼API Key将下行替换为:api_key="sk-xxx",
# 新加坡和北京地域的API Key不同。获取API Key:https://help.aliyun.com/zh/model-studio/get-api-key
api_key='sk-e41914f0d9c94035a5ae1322e9a61fb1',
# 以下是北京地域base_url,如果使用新加坡地域的模型,需要将base_url替换为:https://dashscope-intl.aliyuncs.com/compatible-mode/v1
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
pdf_path = "C:/Users/Administrator/Desktop/43610236590 (3).pdf"
def pdf_to_images(pdf_path, output_dir='./pdf_pages', dpi=150):
"""
将PDF文件逐页转为PNG图片
:param pdf_path: PDF文件路径
:param output_dir: 输出目录,默认当前目录下pdf_pages文件夹
:param dpi: 渲染分辨率,默认150
:return: 生成的图片路径列表
"""
os.makedirs(output_dir, exist_ok=True)
doc = fitz.open(pdf_path)
image_paths = []
for page_num in range(len(doc)):
page = doc.load_page(page_num)
mat = fitz.Matrix(dpi / 72, dpi / 72)
pix = page.get_pixmap(matrix=mat)
output_path = os.path.join(output_dir, f"page_{page_num + 1}.png")
pix.save(output_path)
image_paths.append(output_path)
doc.close()
return image_paths
def download_image(image_url, save_path='output.png'):
try:
response = requests.get(image_url, stream=True, timeout=300) # 设置超时
response.raise_for_status() # 如果HTTP状态码不是200,则引发异常
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"图像已成功下载到: {save_path}")
except requests.exceptions.RequestException as e:
print(f"图像下载失败: {e}")
def encode_file(file_path):
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type or not mime_type.startswith("image/"):
raise ValueError("不支持或无法识别的图像格式")
with open(file_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
return f"data:{mime_type};base64,{encoded_string}"
image_paths = pdf_to_images(pdf_path)
image_base64 = encode_file(image_paths[0])
# 获取图片分辨率,并在提示中要求模型按比例(相对宽高的0-1浮点数)返回坐标
img_w, img_h = Image.open(image_paths[0]).size
print(f"页面尺寸: {img_w}x{img_h} 像素")
def safe_extract_json(text: str):
"""从模型返回文本中尽可能鲁棒地提取JSON对象。"""
# 直接尝试解析
try:
return json.loads(text)
except Exception:
pass
# 尝试提取首尾花括号之间的内容
start = text.find('{')
end = text.rfind('}')
if start != -1 and end != -1 and end > start:
candidate = text[start:end+1]
try:
return json.loads(candidate)
except Exception:
# 将单引号替换为双引号再试一次(有些模型会返回单引号JSON样式)
candidate2 = candidate.replace("'", '"')
try:
return json.loads(candidate2)
except Exception:
return None
return None
def normalize_bbox(bbox, img_w, img_h):
"""
规范化bbox为像素坐标 [x1,y1,x2,y2],并保证边界有效。
支持:
- 像素坐标;
- 归一化坐标(0-1);
- [x1,y1,w,h] 形式(若x2<=x1或y2<=y1则按宽高处理)。
"""
if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
return None
x1, y1, x2, y2 = bbox
# 如果可能是归一化坐标
if 0 <= x1 <= 1 and 0 <= y1 <= 1 and 0 <= x2 <= 1 and 0 <= y2 <= 1:
x1 = int(round(x1 * img_w))
y1 = int(round(y1 * img_h))
x2 = int(round(x2 * img_w))
y2 = int(round(y2 * img_h))
else:
# 像素坐标或 [x1,y1,w,h]
x1 = int(round(x1))
y1 = int(round(y1))
x2 = int(round(x2))
y2 = int(round(y2))
# 如果x2<=x1或y2<=y1,尝试将其视作宽高
if x2 <= x1 or y2 <= y1:
x2 = x1 + max(1, x2)
y2 = y1 + max(1, y2)
# 修正边界并加上少量边距
if x1 > x2:
x1, x2 = x2, x1
if y1 > y2:
y1, y2 = y2, y1
# 边距按短边的2%(最少5px,最多30px)
short_side = min(img_w, img_h)
margin = max(5, min(30, int(0.02 * short_side)))
x1 = max(0, x1 - margin)
y1 = max(0, y1 - margin)
x2 = min(img_w - 1, x2 + margin)
y2 = min(img_h - 1, y2 + margin)
return [x1, y1, x2, y2]
def erase_regions_on_image(image_path: str, coords_map: dict, save_path: str):
img = Image.open(image_path).convert('RGB')
draw = ImageDraw.Draw(img)
w, h = img.size
for key, bbox in coords_map.items():
nb = normalize_bbox(bbox, w, h)
if nb is None:
print(f"坐标格式错误,跳过 {key}: {bbox}")
continue
x1, y1, x2, y2 = nb
draw.rectangle([x1, y1, x2, y2], fill=(255, 255, 255))
print(f"已抹除 {key} 区域: {nb}")
os.makedirs(os.path.dirname(save_path), exist_ok=True)
img.save(save_path)
print(f"清理后的图片已保存: {save_path}")
def draw_debug_boxes(image_path: str, coords_map: dict, save_path: str):
"""在原图上绘制预测的矩形框用于人工核对。"""
img = Image.open(image_path).convert('RGB')
draw = ImageDraw.Draw(img)
w, h = img.size
for key, bbox in coords_map.items():
nb = normalize_bbox(bbox, w, h)
if nb is None:
print(f"跳过无法解析的坐标: {key} -> {bbox}")
continue
x1, y1, x2, y2 = nb
draw.rectangle([x1, y1, x2, y2], outline=(255, 0, 0), width=3)
draw.text((x1, max(0, y1 - 16)), key, fill=(255, 0, 0))
os.makedirs(os.path.dirname(save_path), exist_ok=True)
img.save(save_path)
print(f"调试框已生成: {save_path}")
def images_to_pdf(image_paths, output_pdf):
os.makedirs(os.path.dirname(output_pdf), exist_ok=True)
pil_images = [Image.open(p).convert('RGB') for p in image_paths]
if not pil_images:
raise RuntimeError("没有需要写入PDF的图片")
first = pil_images[0]
rest = pil_images[1:]
first.save(output_pdf, save_all=True, append_images=rest)
print(f"已生成PDF: {output_pdf}")
completion = client.chat.completions.create(
model="qwen3-vl-plus", # 此处以qwen3-vl-plus为例,可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/models
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": image_base64
},
},
{"type": "text", "text": f"图像分辨率为{img_w}x{img_h}像素。坐标系定义:以原始图像左上角为原点(0,0),x向右增加,y向下增加;不要使用任何预处理(缩放或加黑边)产生的坐标。请仅返回这两个文本的矩形框坐标,且必须是归一化到[0,1]的浮点数(相对于原始图像宽高),返回格式严格为压缩JSON、无任何解释:{{\"AGN\": [x1_rel, y1_rel, x2_rel, y2_rel], \"UCLINK LOGISITICS LTD\": [x3_rel, y3_rel, x4_rel, y4_rel]}}。"},
],
},
],
temperature=0.1,
)
raw_text = completion.choices[0].message.content
print(raw_text)
result = safe_extract_json(raw_text)
if result is None or not isinstance(result, dict):
raise RuntimeError("模型返回内容无法解析为JSON坐标,请检查返回格式。")
# 只处理第一页:将抹除后的图片写入 output/cleaned_page_1.png,然后重新生成PDF
cleaned_dir = os.path.join("./output")
cleaned_first = os.path.join(cleaned_dir, "cleaned_page_1.png")
debug_first = os.path.join(cleaned_dir, "debug_page_1.png")
draw_debug_boxes(image_paths[0], result, debug_first)
erase_regions_on_image(image_paths[0], result, cleaned_first)
# 合成PDF:第一页使用清理后的图片,其余页沿用原图
final_images = [cleaned_first] + image_paths[1:]
images_to_pdf(final_images, os.path.join(cleaned_dir, "cleaned.pdf"))
import os
from openai import OpenAI
import requests
import mimetypes
import base64
import fitz # PyMuPDF
import json
from PIL import Image, ImageDraw
client = OpenAI(
# 若没有配置环境变量,请用阿里云百炼API Key将下行替换为:api_key="sk-xxx",
# 新加坡和北京地域的API Key不同。获取API Key:https://help.aliyun.com/zh/model-studio/get-api-key
api_key='sk-e41914f0d9c94035a5ae1322e9a61fb1',
# 以下是北京地域base_url,如果使用新加坡地域的模型,需要将base_url替换为:https://dashscope-intl.aliyuncs.com/compatible-mode/v1
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
# pdf_path = "C:/Users/Administrator/Desktop/43610236590 (3).pdf"
pdf_path = "C:/Users/Administrator/Desktop/43610281036.pdf"
def pdf_to_images(pdf_path, output_dir='./pdf_pages', dpi=150):
"""
将PDF文件逐页转为PNG图片
:param pdf_path: PDF文件路径
:param output_dir: 输出目录,默认当前目录下pdf_pages文件夹
:param dpi: 渲染分辨率,默认150
:return: 生成的图片路径列表
"""
os.makedirs(output_dir, exist_ok=True)
doc = fitz.open(pdf_path)
image_paths = []
for page_num in range(len(doc)):
page = doc.load_page(page_num)
mat = fitz.Matrix(dpi / 72, dpi / 72)
pix = page.get_pixmap(matrix=mat)
output_path = os.path.join(output_dir, f"page_{page_num + 1}.png")
pix.save(output_path)
image_paths.append(output_path)
doc.close()
return image_paths
def download_image(image_url, save_path='output.png'):
try:
response = requests.get(image_url, stream=True, timeout=300) # 设置超时
response.raise_for_status() # 如果HTTP状态码不是200,则引发异常
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"图像已成功下载到: {save_path}")
except requests.exceptions.RequestException as e:
print(f"图像下载失败: {e}")
def encode_file(file_path):
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type or not mime_type.startswith("image/"):
raise ValueError("不支持或无法识别的图像格式")
with open(file_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
return f"data:{mime_type};base64,{encoded_string}"
image_paths = pdf_to_images(pdf_path)
image_base64 = encode_file(image_paths[0])
# 获取图片分辨率,并在提示中要求模型按比例(相对宽高的0-1浮点数)返回坐标
img_w, img_h = Image.open(image_paths[0]).size
print(f"页面尺寸: {img_w}x{img_h} 像素")
def safe_extract_json(text: str):
"""从模型返回文本中尽可能鲁棒地提取JSON对象。"""
# 直接尝试解析
try:
if text.startswith("```json"):
text = text[7:-3].strip()
obj = json.loads(text)
if isinstance(obj, list):
return {'rects': obj}
return obj
except Exception:
pass
print(text)
# 尝试提取首尾花括号之间的内容
start = text.find('{')
end = text.rfind('}')
if start != -1 and end != -1 and end > start:
candidate = text[start:end+1]
try:
return json.loads(candidate)
except Exception:
# 将单引号替换为双引号再试一次(有些模型会返回单引号JSON样式)
candidate2 = candidate.replace("'", '"')
try:
return json.loads(candidate2)
except Exception:
return None
return None
def normalize_bbox(bbox, img_w, img_h):
"""
规范化bbox为像素坐标 [x1,y1,x2,y2],并保证边界有效。
支持:
- 像素坐标;
- 归一化坐标(0-1);
- [x1,y1,w,h] 形式(若x2<=x1或y2<=y1则按宽高处理)。
"""
if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
return None
x1, y1, x2, y2 = bbox
# 如果可能是归一化坐标
if 0 <= x1 <= 1 and 0 <= y1 <= 1 and 0 <= x2 <= 1 and 0 <= y2 <= 1:
x1 = int(round(x1 * img_w))
y1 = int(round(y1 * img_h))
x2 = int(round(x2 * img_w))
y2 = int(round(y2 * img_h))
else:
# 像素坐标或 [x1,y1,w,h]
x1 = int(round(x1))
y1 = int(round(y1))
x2 = int(round(x2))
y2 = int(round(y2))
# 如果x2<=x1或y2<=y1,尝试将其视作宽高
if x2 <= x1 or y2 <= y1:
x2 = x1 + max(1, x2)
y2 = y1 + max(1, y2)
# 修正边界并加上少量边距
if x1 > x2:
x1, x2 = x2, x1
if y1 > y2:
y1, y2 = y2, y1
# 边距按短边的2%(最少5px,最多30px)
short_side = min(img_w, img_h)
margin = max(5, min(30, int(0.02 * short_side)))
x1 = max(0, x1 - margin)
y1 = max(0, y1 - margin)
x2 = min(img_w - 1, x2 + margin)
y2 = min(img_h - 1, y2 + margin)
return [x1, y1, x2, y2]
def erase_regions_on_image(image_path: str, coords_map: dict, save_path: str):
img = Image.open(image_path).convert('RGB')
draw = ImageDraw.Draw(img)
w, h = img.size
for key, bbox in coords_map.items():
nb = normalize_bbox(bbox, w, h)
if nb is None:
print(f"坐标格式错误,跳过 {key}: {bbox}")
continue
x1, y1, x2, y2 = nb
draw.rectangle([x1, y1, x2, y2], fill=(255, 255, 255))
print(f"已抹除 {key} 区域: {nb}")
os.makedirs(os.path.dirname(save_path), exist_ok=True)
img.save(save_path)
print(f"清理后的图片已保存: {save_path}")
def draw_debug_boxes(image_path: str, coords_map: dict, save_path: str):
"""在原图上绘制预测的矩形框用于人工核对。"""
img = Image.open(image_path).convert('RGB')
draw = ImageDraw.Draw(img)
w, h = img.size
for key, bbox in coords_map.items():
nb = normalize_bbox(bbox, w, h)
if nb is None:
print(f"跳过无法解析的坐标: {key} -> {bbox}")
continue
x1, y1, x2, y2 = nb
draw.rectangle([x1, y1, x2, y2], outline=(255, 0, 0), width=3)
draw.text((x1, max(0, y1 - 16)), key, fill=(255, 0, 0))
os.makedirs(os.path.dirname(save_path), exist_ok=True)
img.save(save_path)
print(f"调试框已生成: {save_path}")
def convert_ai_json_to_coords_map(result, img_w: int, img_h: int) -> dict:
"""
将AI返回的JSON统一转换为 {label: [x1,y1,x2,y2]} 形式,兼容多种结构:
1) {"rects":[{"text":"AGN","bbox_norm":{x1,y1,x2,y2},"bbox_px":{x1,y1,x2,y2}}]}
2) {"AGN":[x1,y1,x2,y2], "UCLINK":[...], ...}
3) {"rects":[{"label":"AGN","bbox":[x1,y1,x2,y2]}]}
4) {"rects":[{"text":"AGN","x1":...,"y1":...,"x2":...,"y2":...}]}
返回值可以包含像素或归一化坐标,后续由 normalize_bbox 统一处理。
"""
coords_map: dict = {}
def dict_to_list(b):
if isinstance(b, dict):
return [b.get("x1"), b.get("y1"), b.get("x2"), b.get("y2")]
return b
try:
# 情形A:顶层是dict
if isinstance(result, dict):
# A1:包含 rects 列表
if "rects" in result and isinstance(result["rects"], list):
for i, item in enumerate(result["rects"]):
if not isinstance(item, dict):
continue
label = item.get("text") or item.get("label") or item.get("word") or f"rect_{i}"
idx = item.get("occurrence_index")
key = f"{label}#{idx}" if isinstance(idx, int) and idx > 0 else label
bbox_px = dict_to_list(item.get("bbox_px") or item.get("bbox_pixels"))
bbox_norm = dict_to_list(item.get("bbox_norm"))
bbox_generic = dict_to_list(item.get("bbox"))
chosen = None
# 如果同时存在像素和归一化,做一致性校验
if isinstance(bbox_px, (list, tuple)) and len(bbox_px) == 4 and isinstance(bbox_norm, (list, tuple)) and len(bbox_norm) == 4:
try:
px_from_norm = [int(round(float(bbox_norm[0]) * img_w)),
int(round(float(bbox_norm[1]) * img_h)),
int(round(float(bbox_norm[2]) * img_w)),
int(round(float(bbox_norm[3]) * img_h))]
diff = sum(abs(px_from_norm[j] - int(round(float(bbox_px[j])))) for j in range(4))
chosen = bbox_px if diff <= 4 else bbox_norm
except Exception:
chosen = bbox_px
elif isinstance(bbox_px, (list, tuple)) and len(bbox_px) == 4:
chosen = bbox_px
elif isinstance(bbox_norm, (list, tuple)) and len(bbox_norm) == 4:
chosen = bbox_norm
elif isinstance(bbox_generic, (list, tuple)) and len(bbox_generic) == 4:
chosen = bbox_generic
else:
# 直接字段 x1,y1,x2,y2
if all(k in item for k in ("x1", "y1", "x2", "y2")):
chosen = [item.get("x1"), item.get("y1"), item.get("x2"), item.get("y2")]
if isinstance(chosen, (list, tuple)) and len(chosen) == 4:
coords_map[key] = list(chosen)
else:
print(f"跳过无法解析的rect: {item}")
else:
# A2:简单键值对形式
for k, v in result.items():
if isinstance(v, (list, tuple)) and len(v) == 4:
coords_map[k] = list(v)
# 情形B:顶层是list
elif isinstance(result, list):
for i, item in enumerate(result):
if not isinstance(item, dict):
continue
label = item.get("text") or item.get("label") or item.get("word") or f"rect_{i}"
bbox = item.get("bbox_px") or item.get("bbox_norm") or item.get("bbox")
bbox = dict_to_list(bbox)
if isinstance(bbox, (list, tuple)) and len(bbox) == 4:
coords_map[label] = list(bbox)
else:
print("AI返回的JSON结构未知,无法解析。")
except Exception as e:
print(f"解析AI JSON时发生错误: {e}")
return coords_map
def images_to_pdf(image_paths, output_pdf):
os.makedirs(os.path.dirname(output_pdf), exist_ok=True)
pil_images = [Image.open(p).convert('RGB') for p in image_paths]
if not pil_images:
raise RuntimeError("没有需要写入PDF的图片")
first = pil_images[0]
rest = pil_images[1:]
first.save(output_pdf, save_all=True, append_images=rest)
print(f"已生成PDF: {output_pdf}")
text = f"""(仅归一化坐标,严格 JSON)
你是一名版面定位助手。请在下图中定位并分别框出以下四个单词:AGN、UCLINK、LOGISITICS、LTD。
坐标系与输出要求:
- 图像尺寸:宽 {img_w} 像素,高 {img_h} 像素。
- 原点位于图像左上角;x 向右增大,y 向下增大。
- 为每个目标词返回它的最小外接矩形框,边界紧贴字形,不要添加额外边距。
- 如果存在多处相同词,按从上到下的顺序全部返回,并用 occurrence_index 标识序号;若只出现一次则为 0。
- 返回坐标为相对宽高的归一化浮点数,范围 [0,1],保留 4 位小数;保证 0 ≤ x1 < x2 ≤ 1,0 ≤ y1 < y2 ≤ 1。
- 禁止任何图片预处理(裁剪、缩放、加边距、重采样);坐标必须对应原始图像。
- 严格只输出下面的压缩的 JSON,不要附加解释或其他文本。
- JSON中不要出现不在实例中的参数,例如bbox_2d
输出 JSON 格式(示例为格式演示,实际数值请识别后填充):"""
text += '[{"text":"AGN","occurrence_index":0,"bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}},{"text":"UCLINK","occurrence_index":0,"bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}},{"text":"LOGISITICS","occurrence_index":0,"bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}},{"text":"LTD","occurrence_index":0,"bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}}]'
# text += "例如:{\"AGN\": [0.1, 0.2, 0.3, 0.4], \"UCLINK\": [0.5, 0.6, 0.7, 0.8], \"LOGISITICS\": [0.9, 0.10, 0.11, 0.12], \"LTD\": [0.13, 0.14, 0.15, 0.16]}。确保json格式的正确性"
completion = client.chat.completions.create(
model="qwen3-vl-plus", # 此处以qwen3-vl-plus为例,可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/models
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": image_base64
},
},
{"type": "text", "text": text},
],
},
],
temperature=0.1,
)
raw_text = completion.choices[0].message.content
# raw_text = '```json[{"bbox_norm": {"x1": 0.1028, "y1": 0.1934, "x2": 0.1325, "y2": 0.2006}, "text": "AGN", "occurrence_index": 0},{"bbox_norm": {"x1": 0.1028, "y1": 0.2057, "x2": 0.1608, "y2": 0.2165}, "text": "UCLINK", "occurrence_index": 0},{"bbox_norm": {"x1": 0.1677, "y1": 0.2057, "x2": 0.2657, "y2": 0.2165}, "text": "LOGISITICS", "occurrence_index": 0},{"bbox_norm": {"x1": 0.2726, "y1": 0.2057, "x2": 0.3023, "y2": 0.2165}, "text": "LTD", "occurrence_index": 0}]```'
print(raw_text)
result = safe_extract_json(raw_text)
if result is None or not isinstance(result, dict):
raise RuntimeError("模型返回内容无法解析为JSON坐标,请检查返回格式。")
# 只处理第一页:将抹除后的图片写入 output/cleaned_page_1.png,然后重新生成PDF
cleaned_dir = os.path.join("./output")
cleaned_first = os.path.join(cleaned_dir, "cleaned_page_1.png")
debug_first = os.path.join(cleaned_dir, "debug_page_1.png")
coords_map = convert_ai_json_to_coords_map(result, img_w, img_h)
if not coords_map:
raise RuntimeError("无法从AI返回中提取矩形框坐标,请检查输出格式或提示词。")
print(f"解析并统一后的坐标字典: {coords_map}")
draw_debug_boxes(image_paths[0], coords_map, debug_first)
erase_regions_on_image(image_paths[0], coords_map, cleaned_first)
# 合成PDF:第一页使用清理后的图片,其余页沿用原图
final_images = [cleaned_first] + image_paths[1:]
images_to_pdf(final_images, os.path.join(cleaned_dir, "cleaned.pdf"))
import os
from openai import OpenAI
import requests
import mimetypes
import base64
import fitz # PyMuPDF
import json
from PIL import Image, ImageDraw
client = OpenAI(
# 若没有配置环境变量,请用阿里云百炼API Key将下行替换为:api_key="sk-xxx",
# 新加坡和北京地域的API Key不同。获取API Key:https://help.aliyun.com/zh/model-studio/get-api-key
api_key='sk-e41914f0d9c94035a5ae1322e9a61fb1',
# 以下是北京地域base_url,如果使用新加坡地域的模型,需要将base_url替换为:https://dashscope-intl.aliyuncs.com/compatible-mode/v1
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
pdf_path = "C:/Users/Administrator/Desktop/43610281036.pdf"
def pdf_to_images(pdf_path, output_dir='./pdf_pages', dpi=150):
"""
将PDF文件逐页转为PNG图片
:param pdf_path: PDF文件路径
:param output_dir: 输出目录,默认当前目录下pdf_pages文件夹
:param dpi: 渲染分辨率,默认150
:return: 生成的图片路径列表
"""
os.makedirs(output_dir, exist_ok=True)
doc = fitz.open(pdf_path)
image_paths = []
for page_num in range(len(doc)):
page = doc.load_page(page_num)
mat = fitz.Matrix(dpi / 72, dpi / 72)
pix = page.get_pixmap(matrix=mat)
output_path = os.path.join(output_dir, f"page_{page_num + 1}.png")
pix.save(output_path)
image_paths.append(output_path)
doc.close()
return image_paths
def download_image(image_url, save_path='output.png'):
try:
response = requests.get(image_url, stream=True, timeout=300) # 设置超时
response.raise_for_status() # 如果HTTP状态码不是200,则引发异常
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"图像已成功下载到: {save_path}")
except requests.exceptions.RequestException as e:
print(f"图像下载失败: {e}")
def encode_file(file_path):
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type or not mime_type.startswith("image/"):
raise ValueError("不支持或无法识别的图像格式")
with open(file_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
return f"data:{mime_type};base64,{encoded_string}"
image_paths = pdf_to_images(pdf_path)
image_base64 = encode_file(image_paths[0])
# 获取图片分辨率,并在提示中要求模型按比例(相对宽高的0-1浮点数)返回坐标
img_w, img_h = Image.open(image_paths[0]).size
print(f"页面尺寸: {img_w}x{img_h} 像素")
def safe_extract_json(text: str):
"""从模型返回文本中尽可能鲁棒地提取JSON对象。"""
# 直接尝试解析
try:
return json.loads(text)
except Exception:
pass
# 尝试提取首尾花括号之间的内容
start = text.find('{')
end = text.rfind('}')
if start != -1 and end != -1 and end > start:
candidate = text[start:end+1]
try:
return json.loads(candidate)
except Exception:
# 将单引号替换为双引号再试一次(有些模型会返回单引号JSON样式)
candidate2 = candidate.replace("'", '"')
try:
return json.loads(candidate2)
except Exception:
return None
return None
def normalize_bbox(bbox, img_w, img_h):
"""
规范化bbox为像素坐标 [x1,y1,x2,y2],并保证边界有效。
支持:
- 像素坐标;
- 归一化坐标(0-1);
- [x1,y1,w,h] 形式(若x2<=x1或y2<=y1则按宽高处理)。
"""
if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
return None
x1, y1, x2, y2 = bbox
# 如果可能是归一化坐标
if 0 <= x1 <= 1 and 0 <= y1 <= 1 and 0 <= x2 <= 1 and 0 <= y2 <= 1:
x1 = int(round(x1 * img_w))
y1 = int(round(y1 * img_h))
x2 = int(round(x2 * img_w))
y2 = int(round(y2 * img_h))
else:
# 像素坐标或 [x1,y1,w,h]
x1 = int(round(x1))
y1 = int(round(y1))
x2 = int(round(x2))
y2 = int(round(y2))
# 如果x2<=x1或y2<=y1,尝试将其视作宽高
if x2 <= x1 or y2 <= y1:
x2 = x1 + max(1, x2)
y2 = y1 + max(1, y2)
# 修正边界并加上少量边距
if x1 > x2:
x1, x2 = x2, x1
if y1 > y2:
y1, y2 = y2, y1
# 边距按短边的2%(最少5px,最多30px)
short_side = min(img_w, img_h)
margin = max(5, min(30, int(0.02 * short_side)))
x1 = max(0, x1 - margin)
y1 = max(0, y1 - margin)
x2 = min(img_w - 1, x2 + margin)
y2 = min(img_h - 1, y2 + margin)
return [x1, y1, x2, y2]
def erase_regions_on_image(image_path: str, coords_map: dict, save_path: str):
img = Image.open(image_path).convert('RGB')
draw = ImageDraw.Draw(img)
w, h = img.size
for key, bbox in coords_map.items():
nb = normalize_bbox(bbox, w, h)
if nb is None:
print(f"坐标格式错误,跳过 {key}: {bbox}")
continue
x1, y1, x2, y2 = nb
draw.rectangle([x1, y1, x2, y2], fill=(255, 255, 255))
print(f"已抹除 {key} 区域: {nb}")
os.makedirs(os.path.dirname(save_path), exist_ok=True)
img.save(save_path)
print(f"清理后的图片已保存: {save_path}")
def draw_debug_boxes(image_path: str, coords_map: dict, save_path: str):
"""在原图上绘制预测的矩形框用于人工核对。"""
img = Image.open(image_path).convert('RGB')
draw = ImageDraw.Draw(img)
w, h = img.size
for key, bbox in coords_map.items():
nb = normalize_bbox(bbox, w, h)
if nb is None:
print(f"跳过无法解析的坐标: {key} -> {bbox}")
continue
x1, y1, x2, y2 = nb
draw.rectangle([x1, y1, x2, y2], outline=(255, 0, 0), width=3)
draw.text((x1, max(0, y1 - 16)), key, fill=(255, 0, 0))
os.makedirs(os.path.dirname(save_path), exist_ok=True)
img.save(save_path)
print(f"调试框已生成: {save_path}")
def images_to_pdf(image_paths, output_pdf):
os.makedirs(os.path.dirname(output_pdf), exist_ok=True)
pil_images = [Image.open(p).convert('RGB') for p in image_paths]
if not pil_images:
raise RuntimeError("没有需要写入PDF的图片")
first = pil_images[0]
rest = pil_images[1:]
first.save(output_pdf, save_all=True, append_images=rest)
print(f"已生成PDF: {output_pdf}")
completion = client.chat.completions.create(
model="qwen3-vl-plus", # 此处以qwen3-vl-plus为例,可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/models
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": image_base64
},
},
{"type": "text", "text": f"图像分辨率为{img_w}x{img_h}像素。坐标系定义:以原始图像左上角为原点(0,0),x向右增加,y向下增加;不要使用任何预处理(缩放或加黑边)产生的坐标。请仅返回这两个文本的矩形框坐标,且必须是归一化到[0,1]的浮点数(相对于原始图像宽高),返回格式严格为压缩JSON、无任何解释:{{\"AGN\": [x1_rel, y1_rel, x2_rel, y2_rel], \"UCLINK LOGISITICS LTD\": [x3_rel, y3_rel, x4_rel, y4_rel]}}。"},
],
},
],
)
raw_text = completion.choices[0].message.content
print(raw_text)
result = safe_extract_json(raw_text)
if result is None or not isinstance(result, dict):
raise RuntimeError("模型返回内容无法解析为JSON坐标,请检查返回格式。")
# 只处理第一页:将抹除后的图片写入 output/cleaned_page_1.png,然后重新生成PDF
cleaned_dir = os.path.join("./output")
cleaned_first = os.path.join(cleaned_dir, "cleaned_page_1.png")
debug_first = os.path.join(cleaned_dir, "debug_page_1.png")
draw_debug_boxes(image_paths[0], result, debug_first)
erase_regions_on_image(image_paths[0], result, cleaned_first)
# 合成PDF:第一页使用清理后的图片,其余页沿用原图
final_images = [cleaned_first] + image_paths[1:]
images_to_pdf(final_images, os.path.join(cleaned_dir, "cleaned.pdf"))
...@@ -351,6 +351,7 @@ class CcBl(models.Model): ...@@ -351,6 +351,7 @@ class CcBl(models.Model):
bl_sync_log_ids = fields.One2many('cc.bl.sync.log', 'bl_id', string='Bill Of Loading Sync Logs') bl_sync_log_ids = fields.One2many('cc.bl.sync.log', 'bl_id', string='Bill Of Loading Sync Logs')
# 增加提单状态操作时间:取最新一条提单节点同步信息的操作时间 # 增加提单状态操作时间:取最新一条提单节点同步信息的操作时间
process_time = fields.Datetime(string='Process Time', compute='_compute_process_time', store=True) process_time = fields.Datetime(string='Process Time', compute='_compute_process_time', store=True)
push_remark = fields.Text('Push Remark')
def change_state_by_ship_package(self): def change_state_by_ship_package(self):
""" """
......
...@@ -3,4 +3,5 @@ ...@@ -3,4 +3,5 @@
from . import batch_input_ship_package_statu_wizard from . import batch_input_ship_package_statu_wizard
from . import update_bl_status_wizard from . import update_bl_status_wizard
from . import batch_get_pod_info_wizard
# -*- coding: utf-8 -*-
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import logging
from odoo import models, fields, api, _
_logger = logging.getLogger(__name__)
class BatchGetPodInfoWizard(models.TransientModel):
_inherit = 'batch.get.pod.info.wizard'
_description = 'Batch Get POD Info Wizard' # 批量获取POD信息向导
def _validate_node_push_conditions(self, processed_files):
"""
验证节点推送条件(简化版本)
第一步:对比小包当前节点的操作时间是否小于提取时间(同时区对比)
第二步:如果是需要补推的,提取时间 − 小包当前节点的操作时间 < 前序间隔时间的不能推送
:param processed_files: 处理后的文件数组
:return: 满足条件的文件数组
"""
valid_files = []
for file_info in processed_files:
if not file_info.get('bl'):
continue
bl = file_info['bl']
_logger.info(f"开始验证提单 {bl.bl_no} 的节点推送条件")
try:
# 获取提取时间
extracted_times = self._extract_time_from_pdf(
file_info.get('file_data'),
bl.bl_no,
file_info.get('ocr_texts')
)
if not extracted_times:
_logger.warning(f"提单 {bl.bl_no} 未提取到时间信息,跳过节点推送")
continue
# 取最早时间作为提取时间
extract_time = min(extracted_times)
_logger.info(f"提单 {bl.bl_no} 最早提取时间: {extract_time}")
# 获取小包信息
ship_packages = bl.ship_package_ids
if not ship_packages:
_logger.warning(f"提单 {bl.bl_no} 没有关联的小包,跳过节点推送")
continue
# 智能验证:先检查是否同一状态
is_same_status = self._check_packages_same_status(ship_packages)
if is_same_status:
# 同一状态,只验证第一个小包
_logger.info(f"提单 {bl.bl_no} 所有小包状态一致,只验证第一个小包")
first_package = ship_packages[0]
if self._validate_package_push_condition(first_package, extract_time, bl.bl_no):
# 第一个小包满足条件,所有小包都满足
file_info['valid_packages'] = ship_packages
valid_files.append(file_info)
_logger.info(f"提单 {bl.bl_no} 所有 {len(ship_packages)} 个小包都满足推送条件")
else:
# 第一个小包不满足条件,所有小包都不满足
self._add_bl_push_remark(bl, len(ship_packages), extract_time)
_logger.warning(f"提单 {bl.bl_no} 所有 {len(ship_packages)} 个小包都不满足推送条件")
else:
# 不同状态,逐个验证
_logger.info(f"提单 {bl.bl_no} 小包状态不一致,逐个验证")
valid_packages = []
invalid_count = 0
for package in ship_packages:
if self._validate_package_push_condition(package, extract_time, bl.bl_no):
valid_packages.append(package)
else:
invalid_count += 1
if valid_packages:
file_info['valid_packages'] = valid_packages
valid_files.append(file_info)
_logger.info(f"提单 {bl.bl_no} 有 {len(valid_packages)} 个小包满足推送条件")
if invalid_count > 0:
self._add_bl_push_remark(bl, invalid_count, extract_time)
_logger.warning(f"提单 {bl.bl_no} 有 {invalid_count} 个小包不满足推送条件")
except Exception as e:
_logger.error(f"验证提单 {bl.bl_no} 节点推送条件失败: {str(e)}")
continue
return valid_files
def _check_packages_same_status(self, ship_packages):
"""
检查小包是否同一状态(批量优化版本)
:param ship_packages: 小包记录集
:return: 是否同一状态
"""
try:
if not ship_packages or len(ship_packages) <= 1:
return True
# 使用SQL批量查询所有小包的状态ID
package_ids = ship_packages.ids
sql_query = """
SELECT DISTINCT state
FROM cc_ship_package
WHERE id IN %s AND state IS NOT NULL
"""
self.env.cr.execute(sql_query, (tuple(package_ids),))
results = self.env.cr.fetchall()
# 如果只有一个不同的状态ID,说明所有小包都是同一状态
unique_status_count = len(results)
_logger.info(f"提单小包状态检查: {len(package_ids)} 个小包,{unique_status_count} 种不同状态")
return unique_status_count <= 1
except Exception as e:
_logger.error(f"检查小包状态一致性失败: {str(e)}")
return False
def _get_package_operation_times(self, package_ids):
"""
获取小包当前节点操作时间(支持单个和批量)
根据小包的当前状态,查找同步日志中该状态的最晚时间
:param package_ids: 小包ID列表或单个小包ID
:return: {package_id: operation_time} 字典 或 单个操作时间
"""
try:
# 统一处理单个ID和ID列表
if isinstance(package_ids, int):
package_ids = [package_ids]
single_mode = True
else:
single_mode = False
if not package_ids:
return None if single_mode else {}
# 使用SQL批量查询小包当前节点操作时间
# 根据小包当前状态,查找同步日志中该状态的最晚时间
sql_query = """
SELECT
p.id,
MAX(n.operate_time) as latest_operation_time
FROM cc_ship_package p
LEFT JOIN cc_ship_package_sync_log n ON p.id = n.package_id
AND n.process_code = p.tk_code
WHERE p.id IN %s
GROUP BY p.id
"""
self.env.cr.execute(sql_query, (tuple(package_ids),))
results = self.env.cr.fetchall()
operation_times = {}
for package_id, operate_time in results:
if operate_time:
operation_times[package_id] = operate_time
_logger.info(f"获取到 {len(operation_times)} 个小包的操作时间")
# 如果是单个模式,返回单个值
if single_mode:
return operation_times.get(package_ids[0])
else:
return operation_times
except Exception as e:
_logger.error(f"获取小包操作时间失败: {str(e)}")
return None if single_mode else {}
def _add_bl_push_remark(self, bl, failed_count, extract_time):
"""
在提单上添加推送备注
:param bl: 提单对象
:param failed_count: 失败的小包数量
:param extract_time: 提取时间
"""
try:
# 构建备注信息
remark = f"获取尾程POD,自动推送节点失败,有风险产生倒挂。失败小包数量: {failed_count},请手动操作{extract_time.strftime('%Y-%m-%d %H:%M:%S')}(获取时间)"
# 更新提单的推送备注字段
if hasattr(bl, 'push_remark'):
# 如果已有备注,追加新备注
existing_remark = bl.push_remark or ""
new_remark = f"{existing_remark}\n{remark}" if existing_remark else remark
bl.write({'push_remark': new_remark})
except Exception as e:
_logger.error(f"为提单 {bl.bl_no} 添加推送备注失败: {str(e)}")
def _validate_package_push_condition(self, package, extract_time, bl_no):
"""
验证单个小包的推送条件(简化版本)
第一步:对比小包当前节点的操作时间是否小于提取时间(同时区对比)
第二步:如果是需要补推的,提取时间 − 小包当前节点的操作时间 < 前序间隔时间的不能推送
:param package: 小包对象
:param extract_time: 提取时间
:param bl_no: 提单号
:return: 是否满足推送条件
"""
try:
# 获取小包当前节点的操作时间
current_node_operation_time = self._get_package_operation_times(package.id)
#没有操作时间,代表没有推送过,可以推送
if not current_node_operation_time:
return True
# 第一步:对比小包当前节点的操作时间是否小于提取时间(同时区对比)
if current_node_operation_time >= extract_time:
_logger.warning(f"小包 {package.id} 当前节点操作时间 {current_node_operation_time} >= 提取时间 {extract_time},不满足推送条件")
return False
# 第二步:如果是需要补推的,提取时间 − 小包当前节点的操作时间 < 前序间隔时间的不能推送
flag, match_node = self._need_supplement_push(package)
if flag:
interval_time = self._get_write_node_previous_interval_time(package, match_node)
if interval_time:
# 计算提取时间减去小包当前节点的操作时间
time_diff = extract_time - current_node_operation_time
if time_diff < interval_time:
_logger.warning(f"小包 {package.id} 时间差 {time_diff} < 前序间隔时间 {interval_time},不满足补推条件")
return False
else:
_logger.info(f"小包 {package.id} 时间差 {time_diff} >= 前序间隔时间 {interval_time},满足补推条件")
_logger.info(f"小包 {package.id} 满足推送条件")
return True
except Exception as e:
_logger.error(f"验证小包 {package.id} 推送条件失败: {str(e)}")
return False
def _need_supplement_push(self, package):
"""
判断是否需要补推节点
:param package: 小包对象
:return: 是否需要补推
"""
pod_node=False
try:
# 查找对应的清关节点(勾选了POD节点匹配的节点)
pod_node = self.env['cc.node'].search([
('is_pod_node', '=', True),
('node_type', '=', 'package')
], limit=1)
#小包节点在这个节点之前,则需要补推节点
if pod_node:
return package.state.id < pod_node.id,pod_node
return False,pod_node
except Exception as e:
_logger.error(f"判断小包 {package.id} 是否需要补推失败: {str(e)}")
return False,pod_node
def _get_write_node_previous_interval_time(self, package, match_node):
"""
获取写入节点前序间隔时间
:param package: 小包对象
:param match_node: 匹配的节点
:return: 前序间隔时间(timedelta对象)
"""
try:
# 获取前序间隔时间(分钟数)
before_minutes = package.state.calculate_total_interval(match_node)
if before_minutes and before_minutes > 0:
# 将分钟数转换为timedelta对象
from datetime import timedelta
return timedelta(minutes=before_minutes)
return None
except Exception as e:
_logger.error(f"获取小包 {package.id} 前序间隔时间失败: {str(e)}")
return None
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论