提交 412ebd58 authored 作者: 贺阳's avatar 贺阳

从pdf获取到的时间是当前用户的时区,需要转换为0时区和小包当前节点进行比较

上级 20a702a1
......@@ -94,7 +94,7 @@ class BatchGetPodInfoWizard(models.TransientModel):
# 如果启用了涂抹文字,进行处理
if self.remove_specified_text and processed_files:
processed_files = self._remove_specified_text(processed_files, debug_mode=False)
# processed_files = self._remove_specified_text(processed_files, debug_mode=False)
# 合并PDF并保存到pdf_file字段
self._merge_pdf_files(processed_files)
......
import os
from openai import OpenAI
import requests
import mimetypes
import base64
import fitz # PyMuPDF
import json
from PIL import Image, ImageDraw
client = OpenAI(
# 若没有配置环境变量,请用阿里云百炼API Key将下行替换为:api_key="sk-xxx",
# 新加坡和北京地域的API Key不同。获取API Key:https://help.aliyun.com/zh/model-studio/get-api-key
api_key='sk-e41914f0d9c94035a5ae1322e9a61fb1',
# 以下是北京地域base_url,如果使用新加坡地域的模型,需要将base_url替换为:https://dashscope-intl.aliyuncs.com/compatible-mode/v1
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
pdf_path = "C:/Users/Administrator/Desktop/43610236590 (3).pdf"
def pdf_to_images(pdf_path, output_dir='./pdf_pages', dpi=150):
"""
将PDF文件逐页转为PNG图片
:param pdf_path: PDF文件路径
:param output_dir: 输出目录,默认当前目录下pdf_pages文件夹
:param dpi: 渲染分辨率,默认150
:return: 生成的图片路径列表
"""
os.makedirs(output_dir, exist_ok=True)
doc = fitz.open(pdf_path)
image_paths = []
for page_num in range(len(doc)):
page = doc.load_page(page_num)
mat = fitz.Matrix(dpi / 72, dpi / 72)
pix = page.get_pixmap(matrix=mat)
output_path = os.path.join(output_dir, f"page_{page_num + 1}.png")
pix.save(output_path)
image_paths.append(output_path)
doc.close()
return image_paths
def download_image(image_url, save_path='output.png'):
try:
response = requests.get(image_url, stream=True, timeout=300) # 设置超时
response.raise_for_status() # 如果HTTP状态码不是200,则引发异常
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"图像已成功下载到: {save_path}")
except requests.exceptions.RequestException as e:
print(f"图像下载失败: {e}")
def encode_file(file_path):
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type or not mime_type.startswith("image/"):
raise ValueError("不支持或无法识别的图像格式")
with open(file_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
return f"data:{mime_type};base64,{encoded_string}"
image_paths = pdf_to_images(pdf_path)
image_base64 = encode_file(image_paths[0])
# 获取图片分辨率,并在提示中要求模型按比例(相对宽高的0-1浮点数)返回坐标
img_w, img_h = Image.open(image_paths[0]).size
print(f"页面尺寸: {img_w}x{img_h} 像素")
def safe_extract_json(text: str):
"""从模型返回文本中尽可能鲁棒地提取JSON对象。"""
# 直接尝试解析
try:
return json.loads(text)
except Exception:
pass
# 尝试提取首尾花括号之间的内容
start = text.find('{')
end = text.rfind('}')
if start != -1 and end != -1 and end > start:
candidate = text[start:end+1]
try:
return json.loads(candidate)
except Exception:
# 将单引号替换为双引号再试一次(有些模型会返回单引号JSON样式)
candidate2 = candidate.replace("'", '"')
try:
return json.loads(candidate2)
except Exception:
return None
return None
def normalize_bbox(bbox, img_w, img_h):
"""
规范化bbox为像素坐标 [x1,y1,x2,y2],并保证边界有效。
支持:
- 像素坐标;
- 归一化坐标(0-1);
- [x1,y1,w,h] 形式(若x2<=x1或y2<=y1则按宽高处理)。
"""
if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
return None
x1, y1, x2, y2 = bbox
# 如果可能是归一化坐标
if 0 <= x1 <= 1 and 0 <= y1 <= 1 and 0 <= x2 <= 1 and 0 <= y2 <= 1:
x1 = int(round(x1 * img_w))
y1 = int(round(y1 * img_h))
x2 = int(round(x2 * img_w))
y2 = int(round(y2 * img_h))
else:
# 像素坐标或 [x1,y1,w,h]
x1 = int(round(x1))
y1 = int(round(y1))
x2 = int(round(x2))
y2 = int(round(y2))
# 如果x2<=x1或y2<=y1,尝试将其视作宽高
if x2 <= x1 or y2 <= y1:
x2 = x1 + max(1, x2)
y2 = y1 + max(1, y2)
# 修正边界并加上少量边距
if x1 > x2:
x1, x2 = x2, x1
if y1 > y2:
y1, y2 = y2, y1
# 边距按短边的2%(最少5px,最多30px)
short_side = min(img_w, img_h)
margin = max(5, min(30, int(0.02 * short_side)))
x1 = max(0, x1 - margin)
y1 = max(0, y1 - margin)
x2 = min(img_w - 1, x2 + margin)
y2 = min(img_h - 1, y2 + margin)
return [x1, y1, x2, y2]
def erase_regions_on_image(image_path: str, coords_map: dict, save_path: str):
img = Image.open(image_path).convert('RGB')
draw = ImageDraw.Draw(img)
w, h = img.size
for key, bbox in coords_map.items():
nb = normalize_bbox(bbox, w, h)
if nb is None:
print(f"坐标格式错误,跳过 {key}: {bbox}")
continue
x1, y1, x2, y2 = nb
draw.rectangle([x1, y1, x2, y2], fill=(255, 255, 255))
print(f"已抹除 {key} 区域: {nb}")
os.makedirs(os.path.dirname(save_path), exist_ok=True)
img.save(save_path)
print(f"清理后的图片已保存: {save_path}")
def draw_debug_boxes(image_path: str, coords_map: dict, save_path: str):
"""在原图上绘制预测的矩形框用于人工核对。"""
img = Image.open(image_path).convert('RGB')
draw = ImageDraw.Draw(img)
w, h = img.size
for key, bbox in coords_map.items():
nb = normalize_bbox(bbox, w, h)
if nb is None:
print(f"跳过无法解析的坐标: {key} -> {bbox}")
continue
x1, y1, x2, y2 = nb
draw.rectangle([x1, y1, x2, y2], outline=(255, 0, 0), width=3)
draw.text((x1, max(0, y1 - 16)), key, fill=(255, 0, 0))
os.makedirs(os.path.dirname(save_path), exist_ok=True)
img.save(save_path)
print(f"调试框已生成: {save_path}")
def images_to_pdf(image_paths, output_pdf):
os.makedirs(os.path.dirname(output_pdf), exist_ok=True)
pil_images = [Image.open(p).convert('RGB') for p in image_paths]
if not pil_images:
raise RuntimeError("没有需要写入PDF的图片")
first = pil_images[0]
rest = pil_images[1:]
first.save(output_pdf, save_all=True, append_images=rest)
print(f"已生成PDF: {output_pdf}")
completion = client.chat.completions.create(
model="qwen3-vl-plus", # 此处以qwen3-vl-plus为例,可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/models
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": image_base64
},
},
{"type": "text", "text": f"图像分辨率为{img_w}x{img_h}像素。坐标系定义:以原始图像左上角为原点(0,0),x向右增加,y向下增加;不要使用任何预处理(缩放或加黑边)产生的坐标。请仅返回这两个文本的矩形框坐标,且必须是归一化到[0,1]的浮点数(相对于原始图像宽高),返回格式严格为压缩JSON、无任何解释:{{\"AGN\": [x1_rel, y1_rel, x2_rel, y2_rel], \"UCLINK LOGISITICS LTD\": [x3_rel, y3_rel, x4_rel, y4_rel]}}。"},
],
},
],
temperature=0.1,
)
raw_text = completion.choices[0].message.content
print(raw_text)
result = safe_extract_json(raw_text)
if result is None or not isinstance(result, dict):
raise RuntimeError("模型返回内容无法解析为JSON坐标,请检查返回格式。")
# 只处理第一页:将抹除后的图片写入 output/cleaned_page_1.png,然后重新生成PDF
cleaned_dir = os.path.join("./output")
cleaned_first = os.path.join(cleaned_dir, "cleaned_page_1.png")
debug_first = os.path.join(cleaned_dir, "debug_page_1.png")
draw_debug_boxes(image_paths[0], result, debug_first)
erase_regions_on_image(image_paths[0], result, cleaned_first)
# 合成PDF:第一页使用清理后的图片,其余页沿用原图
final_images = [cleaned_first] + image_paths[1:]
images_to_pdf(final_images, os.path.join(cleaned_dir, "cleaned.pdf"))
import os
from openai import OpenAI
import requests
import mimetypes
import base64
import fitz # PyMuPDF
import json
from PIL import Image, ImageDraw
client = OpenAI(
# 若没有配置环境变量,请用阿里云百炼API Key将下行替换为:api_key="sk-xxx",
# 新加坡和北京地域的API Key不同。获取API Key:https://help.aliyun.com/zh/model-studio/get-api-key
api_key='sk-e41914f0d9c94035a5ae1322e9a61fb1',
# 以下是北京地域base_url,如果使用新加坡地域的模型,需要将base_url替换为:https://dashscope-intl.aliyuncs.com/compatible-mode/v1
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
# pdf_path = "C:/Users/Administrator/Desktop/43610236590 (3).pdf"
pdf_path = "C:/Users/Administrator/Desktop/43610281036.pdf"
def pdf_to_images(pdf_path, output_dir='./pdf_pages', dpi=150):
"""
将PDF文件逐页转为PNG图片
:param pdf_path: PDF文件路径
:param output_dir: 输出目录,默认当前目录下pdf_pages文件夹
:param dpi: 渲染分辨率,默认150
:return: 生成的图片路径列表
"""
os.makedirs(output_dir, exist_ok=True)
doc = fitz.open(pdf_path)
image_paths = []
for page_num in range(len(doc)):
page = doc.load_page(page_num)
mat = fitz.Matrix(dpi / 72, dpi / 72)
pix = page.get_pixmap(matrix=mat)
output_path = os.path.join(output_dir, f"page_{page_num + 1}.png")
pix.save(output_path)
image_paths.append(output_path)
doc.close()
return image_paths
def download_image(image_url, save_path='output.png'):
try:
response = requests.get(image_url, stream=True, timeout=300) # 设置超时
response.raise_for_status() # 如果HTTP状态码不是200,则引发异常
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"图像已成功下载到: {save_path}")
except requests.exceptions.RequestException as e:
print(f"图像下载失败: {e}")
def encode_file(file_path):
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type or not mime_type.startswith("image/"):
raise ValueError("不支持或无法识别的图像格式")
with open(file_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
return f"data:{mime_type};base64,{encoded_string}"
image_paths = pdf_to_images(pdf_path)
image_base64 = encode_file(image_paths[0])
# 获取图片分辨率,并在提示中要求模型按比例(相对宽高的0-1浮点数)返回坐标
img_w, img_h = Image.open(image_paths[0]).size
print(f"页面尺寸: {img_w}x{img_h} 像素")
def safe_extract_json(text: str):
"""从模型返回文本中尽可能鲁棒地提取JSON对象。"""
# 直接尝试解析
try:
if text.startswith("```json"):
text = text[7:-3].strip()
obj = json.loads(text)
if isinstance(obj, list):
return {'rects': obj}
return obj
except Exception:
pass
print(text)
# 尝试提取首尾花括号之间的内容
start = text.find('{')
end = text.rfind('}')
if start != -1 and end != -1 and end > start:
candidate = text[start:end+1]
try:
return json.loads(candidate)
except Exception:
# 将单引号替换为双引号再试一次(有些模型会返回单引号JSON样式)
candidate2 = candidate.replace("'", '"')
try:
return json.loads(candidate2)
except Exception:
return None
return None
def normalize_bbox(bbox, img_w, img_h):
"""
规范化bbox为像素坐标 [x1,y1,x2,y2],并保证边界有效。
支持:
- 像素坐标;
- 归一化坐标(0-1);
- [x1,y1,w,h] 形式(若x2<=x1或y2<=y1则按宽高处理)。
"""
if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
return None
x1, y1, x2, y2 = bbox
# 如果可能是归一化坐标
if 0 <= x1 <= 1 and 0 <= y1 <= 1 and 0 <= x2 <= 1 and 0 <= y2 <= 1:
x1 = int(round(x1 * img_w))
y1 = int(round(y1 * img_h))
x2 = int(round(x2 * img_w))
y2 = int(round(y2 * img_h))
else:
# 像素坐标或 [x1,y1,w,h]
x1 = int(round(x1))
y1 = int(round(y1))
x2 = int(round(x2))
y2 = int(round(y2))
# 如果x2<=x1或y2<=y1,尝试将其视作宽高
if x2 <= x1 or y2 <= y1:
x2 = x1 + max(1, x2)
y2 = y1 + max(1, y2)
# 修正边界并加上少量边距
if x1 > x2:
x1, x2 = x2, x1
if y1 > y2:
y1, y2 = y2, y1
# 边距按短边的2%(最少5px,最多30px)
short_side = min(img_w, img_h)
margin = max(5, min(30, int(0.02 * short_side)))
x1 = max(0, x1 - margin)
y1 = max(0, y1 - margin)
x2 = min(img_w - 1, x2 + margin)
y2 = min(img_h - 1, y2 + margin)
return [x1, y1, x2, y2]
def erase_regions_on_image(image_path: str, coords_map: dict, save_path: str):
img = Image.open(image_path).convert('RGB')
draw = ImageDraw.Draw(img)
w, h = img.size
for key, bbox in coords_map.items():
nb = normalize_bbox(bbox, w, h)
if nb is None:
print(f"坐标格式错误,跳过 {key}: {bbox}")
continue
x1, y1, x2, y2 = nb
draw.rectangle([x1, y1, x2, y2], fill=(255, 255, 255))
print(f"已抹除 {key} 区域: {nb}")
os.makedirs(os.path.dirname(save_path), exist_ok=True)
img.save(save_path)
print(f"清理后的图片已保存: {save_path}")
def draw_debug_boxes(image_path: str, coords_map: dict, save_path: str):
"""在原图上绘制预测的矩形框用于人工核对。"""
img = Image.open(image_path).convert('RGB')
draw = ImageDraw.Draw(img)
w, h = img.size
for key, bbox in coords_map.items():
nb = normalize_bbox(bbox, w, h)
if nb is None:
print(f"跳过无法解析的坐标: {key} -> {bbox}")
continue
x1, y1, x2, y2 = nb
draw.rectangle([x1, y1, x2, y2], outline=(255, 0, 0), width=3)
draw.text((x1, max(0, y1 - 16)), key, fill=(255, 0, 0))
os.makedirs(os.path.dirname(save_path), exist_ok=True)
img.save(save_path)
print(f"调试框已生成: {save_path}")
def convert_ai_json_to_coords_map(result, img_w: int, img_h: int) -> dict:
"""
将AI返回的JSON统一转换为 {label: [x1,y1,x2,y2]} 形式,兼容多种结构:
1) {"rects":[{"text":"AGN","bbox_norm":{x1,y1,x2,y2},"bbox_px":{x1,y1,x2,y2}}]}
2) {"AGN":[x1,y1,x2,y2], "UCLINK":[...], ...}
3) {"rects":[{"label":"AGN","bbox":[x1,y1,x2,y2]}]}
4) {"rects":[{"text":"AGN","x1":...,"y1":...,"x2":...,"y2":...}]}
返回值可以包含像素或归一化坐标,后续由 normalize_bbox 统一处理。
"""
coords_map: dict = {}
def dict_to_list(b):
if isinstance(b, dict):
return [b.get("x1"), b.get("y1"), b.get("x2"), b.get("y2")]
return b
try:
# 情形A:顶层是dict
if isinstance(result, dict):
# A1:包含 rects 列表
if "rects" in result and isinstance(result["rects"], list):
for i, item in enumerate(result["rects"]):
if not isinstance(item, dict):
continue
label = item.get("text") or item.get("label") or item.get("word") or f"rect_{i}"
idx = item.get("occurrence_index")
key = f"{label}#{idx}" if isinstance(idx, int) and idx > 0 else label
bbox_px = dict_to_list(item.get("bbox_px") or item.get("bbox_pixels"))
bbox_norm = dict_to_list(item.get("bbox_norm"))
bbox_generic = dict_to_list(item.get("bbox"))
chosen = None
# 如果同时存在像素和归一化,做一致性校验
if isinstance(bbox_px, (list, tuple)) and len(bbox_px) == 4 and isinstance(bbox_norm, (list, tuple)) and len(bbox_norm) == 4:
try:
px_from_norm = [int(round(float(bbox_norm[0]) * img_w)),
int(round(float(bbox_norm[1]) * img_h)),
int(round(float(bbox_norm[2]) * img_w)),
int(round(float(bbox_norm[3]) * img_h))]
diff = sum(abs(px_from_norm[j] - int(round(float(bbox_px[j])))) for j in range(4))
chosen = bbox_px if diff <= 4 else bbox_norm
except Exception:
chosen = bbox_px
elif isinstance(bbox_px, (list, tuple)) and len(bbox_px) == 4:
chosen = bbox_px
elif isinstance(bbox_norm, (list, tuple)) and len(bbox_norm) == 4:
chosen = bbox_norm
elif isinstance(bbox_generic, (list, tuple)) and len(bbox_generic) == 4:
chosen = bbox_generic
else:
# 直接字段 x1,y1,x2,y2
if all(k in item for k in ("x1", "y1", "x2", "y2")):
chosen = [item.get("x1"), item.get("y1"), item.get("x2"), item.get("y2")]
if isinstance(chosen, (list, tuple)) and len(chosen) == 4:
coords_map[key] = list(chosen)
else:
print(f"跳过无法解析的rect: {item}")
else:
# A2:简单键值对形式
for k, v in result.items():
if isinstance(v, (list, tuple)) and len(v) == 4:
coords_map[k] = list(v)
# 情形B:顶层是list
elif isinstance(result, list):
for i, item in enumerate(result):
if not isinstance(item, dict):
continue
label = item.get("text") or item.get("label") or item.get("word") or f"rect_{i}"
bbox = item.get("bbox_px") or item.get("bbox_norm") or item.get("bbox")
bbox = dict_to_list(bbox)
if isinstance(bbox, (list, tuple)) and len(bbox) == 4:
coords_map[label] = list(bbox)
else:
print("AI返回的JSON结构未知,无法解析。")
except Exception as e:
print(f"解析AI JSON时发生错误: {e}")
return coords_map
def images_to_pdf(image_paths, output_pdf):
os.makedirs(os.path.dirname(output_pdf), exist_ok=True)
pil_images = [Image.open(p).convert('RGB') for p in image_paths]
if not pil_images:
raise RuntimeError("没有需要写入PDF的图片")
first = pil_images[0]
rest = pil_images[1:]
first.save(output_pdf, save_all=True, append_images=rest)
print(f"已生成PDF: {output_pdf}")
text = f"""(仅归一化坐标,严格 JSON)
你是一名版面定位助手。请在下图中定位并分别框出以下四个单词:AGN、UCLINK、LOGISITICS、LTD。
坐标系与输出要求:
- 图像尺寸:宽 {img_w} 像素,高 {img_h} 像素。
- 原点位于图像左上角;x 向右增大,y 向下增大。
- 为每个目标词返回它的最小外接矩形框,边界紧贴字形,不要添加额外边距。
- 如果存在多处相同词,按从上到下的顺序全部返回,并用 occurrence_index 标识序号;若只出现一次则为 0。
- 返回坐标为相对宽高的归一化浮点数,范围 [0,1],保留 4 位小数;保证 0 ≤ x1 < x2 ≤ 1,0 ≤ y1 < y2 ≤ 1。
- 禁止任何图片预处理(裁剪、缩放、加边距、重采样);坐标必须对应原始图像。
- 严格只输出下面的压缩的 JSON,不要附加解释或其他文本。
- JSON中不要出现不在实例中的参数,例如bbox_2d
输出 JSON 格式(示例为格式演示,实际数值请识别后填充):"""
text += '[{"text":"AGN","occurrence_index":0,"bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}},{"text":"UCLINK","occurrence_index":0,"bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}},{"text":"LOGISITICS","occurrence_index":0,"bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}},{"text":"LTD","occurrence_index":0,"bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}}]'
# text += "例如:{\"AGN\": [0.1, 0.2, 0.3, 0.4], \"UCLINK\": [0.5, 0.6, 0.7, 0.8], \"LOGISITICS\": [0.9, 0.10, 0.11, 0.12], \"LTD\": [0.13, 0.14, 0.15, 0.16]}。确保json格式的正确性"
completion = client.chat.completions.create(
model="qwen3-vl-plus", # 此处以qwen3-vl-plus为例,可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/models
messages=[
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": image_base64
},
},
{"type": "text", "text": text},
],
},
],
temperature=0.1,
)
raw_text = completion.choices[0].message.content
# raw_text = '```json[{"bbox_norm": {"x1": 0.1028, "y1": 0.1934, "x2": 0.1325, "y2": 0.2006}, "text": "AGN", "occurrence_index": 0},{"bbox_norm": {"x1": 0.1028, "y1": 0.2057, "x2": 0.1608, "y2": 0.2165}, "text": "UCLINK", "occurrence_index": 0},{"bbox_norm": {"x1": 0.1677, "y1": 0.2057, "x2": 0.2657, "y2": 0.2165}, "text": "LOGISITICS", "occurrence_index": 0},{"bbox_norm": {"x1": 0.2726, "y1": 0.2057, "x2": 0.3023, "y2": 0.2165}, "text": "LTD", "occurrence_index": 0}]```'
print(raw_text)
result = safe_extract_json(raw_text)
if result is None or not isinstance(result, dict):
raise RuntimeError("模型返回内容无法解析为JSON坐标,请检查返回格式。")
# 只处理第一页:将抹除后的图片写入 output/cleaned_page_1.png,然后重新生成PDF
cleaned_dir = os.path.join("./output")
cleaned_first = os.path.join(cleaned_dir, "cleaned_page_1.png")
debug_first = os.path.join(cleaned_dir, "debug_page_1.png")
coords_map = convert_ai_json_to_coords_map(result, img_w, img_h)
if not coords_map:
raise RuntimeError("无法从AI返回中提取矩形框坐标,请检查输出格式或提示词。")
print(f"解析并统一后的坐标字典: {coords_map}")
draw_debug_boxes(image_paths[0], coords_map, debug_first)
erase_regions_on_image(image_paths[0], coords_map, cleaned_first)
# 合成PDF:第一页使用清理后的图片,其余页沿用原图
final_images = [cleaned_first] + image_paths[1:]
images_to_pdf(final_images, os.path.join(cleaned_dir, "cleaned.pdf"))
......@@ -16,7 +16,7 @@ client = OpenAI(
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
pdf_path = "./43610272216.pdf"
pdf_path = "C:/Users/Administrator/Desktop/43610281036.pdf"
def pdf_to_images(pdf_path, output_dir='./pdf_pages', dpi=150):
"""
......@@ -279,11 +279,13 @@ text = f"""(仅归一化坐标,严格 JSON)
- 返回坐标为相对宽高的归一化浮点数,范围 [0,1],保留 4 位小数;保证 0 ≤ x1 < x2 ≤ 1,0 ≤ y1 < y2 ≤ 1。
- 禁止任何图片预处理(裁剪、缩放、加边距、重采样);坐标必须对应原始图像。
- 严格只输出下面的压缩的 JSON,不要附加解释或其他文本。
- JSON中不要出现不在实例中的参数,例如bbox_2d
- JSON中不要出现不在实例中的参数,例如bbox_2d,确保bbox_norm中有且仅有x1,y1,x2,y2四个参数。
输出 JSON 格式(示例为格式演示,实际数值请识别后填充):"""
text += '[{"text":"AGN","bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}},{"text":"UCLINK","bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}},{"text":"LOGISITICS","bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}},{"text":"LTD","bbox_norm":{"x1":0.0000,"y1":0.0000,"x2":0.0000,"y2":0.0000}}]'
# 记录AI处理开始时间
ai_start_time = time.time()
completion = client.chat.completions.create(
model="qwen3-vl-plus", # 此处以qwen3-vl-plus为例,可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/models
messages=[
......@@ -302,6 +304,9 @@ completion = client.chat.completions.create(
],
temperature=0.1,
)
# 记录AI处理结束时间
ai_end_time = time.time()
ai_processing_time = ai_end_time - ai_start_time
raw_text = completion.choices[0].message.content
# raw_text = '```json[{"bbox_norm": {"x1": 0.1028, "y1": 0.1934, "x2": 0.1325, "y2": 0.2006}, "text": "AGN", "occurrence_index": 0},{"bbox_norm": {"x1": 0.1028, "y1": 0.2057, "x2": 0.1608, "y2": 0.2165}, "text": "UCLINK", "occurrence_index": 0},{"bbox_norm": {"x1": 0.1677, "y1": 0.2057, "x2": 0.2657, "y2": 0.2165}, "text": "LOGISITICS", "occurrence_index": 0},{"bbox_norm": {"x1": 0.2726, "y1": 0.2057, "x2": 0.3023, "y2": 0.2165}, "text": "LTD", "occurrence_index": 0}]```'
......@@ -325,5 +330,7 @@ erase_regions_on_image(image_paths[0], coords_map, cleaned_first)
final_images = [cleaned_first] + image_paths[1:]
images_to_pdf(final_images, os.path.join(cleaned_dir, "cleaned.pdf"))
end_time = time.time()
print(f"耗时: {end_time - begin_time} 秒")
total_time = end_time - begin_time
print(f"总耗时: {total_time:.2f} 秒")
print(f"AI处理耗时: {ai_processing_time:.2f} 秒(AI API调用时间)")
ccs_base/wizard/output/debug_page_1.png

176.5 KB | W: | H:

ccs_base/wizard/output/debug_page_1.png

176.5 KB | W: | H:

ccs_base/wizard/output/debug_page_1.png
ccs_base/wizard/output/debug_page_1.png
ccs_base/wizard/output/debug_page_1.png
ccs_base/wizard/output/debug_page_1.png
  • 2-up
  • Swipe
  • Onion skin
......@@ -2,6 +2,8 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import logging
from odoo import models, fields, api, _
import pytz
from datetime import datetime
_logger = logging.getLogger(__name__)
......@@ -37,9 +39,13 @@ class BatchGetPodInfoWizard(models.TransientModel):
_logger.warning(f"提单 {bl.bl_no} 未提取到时间信息,跳过节点推送")
continue
# 取最早时间作为提取时间
extract_time = min(extracted_times)
_logger.info(f"提单 {bl.bl_no} 最早提取时间: {extract_time}")
# 取最早时间作为提取时间(用户时区的本地时间)
extract_time_local = min(extracted_times)
_logger.info(f"提单 {bl.bl_no} 最早提取时间(本地): {extract_time_local}")
# 将提取时间从用户时区转换为UTC(0时区)
extract_time = self._convert_local_time_to_utc(extract_time_local)
_logger.info(f"提单 {bl.bl_no} 最早提取时间(UTC): {extract_time}")
# 获取小包信息
ship_packages = bl.ship_package_ids
......@@ -217,11 +223,15 @@ class BatchGetPodInfoWizard(models.TransientModel):
if interval_time:
# 计算提取时间减去小包当前节点的操作时间
time_diff = extract_time - current_node_operation_time
if time_diff < interval_time:
_logger.warning(f"小包 {package.id} 时间差 {time_diff} < 前序间隔时间 {interval_time},不满足补推条件")
# 将时间差转换为分钟数
time_diff_minutes = time_diff.total_seconds() / 60
# 将间隔时间转换为分钟数
interval_minutes = interval_time.total_seconds() / 60
if time_diff_minutes < interval_minutes:
_logger.warning(f"小包 {package.id} 时间差 {time_diff_minutes:.1f} 分钟 < 前序间隔时间 {interval_minutes:.1f} 分钟,不满足补推条件")
return False
else:
_logger.info(f"小包 {package.id} 时间差 {time_diff} >= 前序间隔时间 {interval_time},满足补推条件")
_logger.info(f"小包 {package.id} 时间差 {time_diff_minutes:.1f} 分钟 >= 前序间隔时间 {interval_minutes:.1f} 分钟,满足补推条件")
_logger.info(f"小包 {package.id} 满足推送条件")
return True
......@@ -271,3 +281,40 @@ class BatchGetPodInfoWizard(models.TransientModel):
_logger.error(f"获取小包 {package.id} 前序间隔时间失败: {str(e)}")
return None
def _convert_local_time_to_utc(self, local_time):
"""
将本地时间(用户时区)转换为UTC时间(0时区)
:param local_time: 本地时间的datetime对象(naive或aware)
:return: UTC时间的datetime对象(naive,用于数据库存储)
"""
try:
# 获取用户时区,如果没有则使用系统默认时区(通常为Asia/Shanghai或UTC)
user_tz_name = self.env.user.tz
if not user_tz_name:
# 尝试从系统配置获取默认时区,如果没有则使用UTC
user_tz_name = self.env['ir.config_parameter'].sudo().get_param('timezone') or 'UTC'
user_tz = pytz.timezone(user_tz_name)
# 如果local_time是naive datetime(没有时区信息),假设它是用户时区的时间
if local_time.tzinfo is None:
# 将naive datetime标记为用户时区
local_time_aware = user_tz.localize(local_time)
else:
# 如果已经是aware datetime,先转换到用户时区,再转换到UTC
local_time_aware = local_time.astimezone(user_tz)
# 转换为UTC时间
utc_time = local_time_aware.astimezone(pytz.UTC)
# 返回naive UTC datetime(Odoo数据库通常存储naive UTC datetime)
utc_time_naive = utc_time.replace(tzinfo=None)
_logger.info(f"时区转换: 本地时间 {local_time} (用户时区: {user_tz_name}) -> UTC时间 {utc_time_naive}")
return utc_time_naive
except Exception as e:
_logger.error(f"时区转换失败: {str(e)},使用原时间(假设已经是UTC)")
# 如果转换失败,返回原时间并移除时区信息(假设已经是UTC)
return local_time.replace(tzinfo=None) if hasattr(local_time, 'tzinfo') and local_time.tzinfo else local_time
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论