Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
H
hh_ccs
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
贺阳
hh_ccs
Commits
5049b46c
提交
5049b46c
authored
10月 29, 2025
作者:
贺阳
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
涂抹预览
上级
145b4da1
隐藏空白字符变更
内嵌
并排
正在显示
4 个修改的文件
包含
739 行增加
和
46 行删除
+739
-46
ai_image_edit_service.py
ccs_base/wizard/ai_image_edit_service.py
+88
-0
batch_get_pod_info_wizard.py
ccs_base/wizard/batch_get_pod_info_wizard.py
+566
-36
batch_get_pod_info_wizard_views.xml
ccs_base/wizard/batch_get_pod_info_wizard_views.xml
+16
-10
image-to-image.py
ccs_base/wizard/image-to-image.py
+69
-0
没有找到文件。
ccs_base/wizard/ai_image_edit_service.py
0 → 100644
浏览文件 @
5049b46c
# -*- coding: utf-8 -*-
import
base64
import
requests
from
dashscope
import
MultiModalConversation
import
dashscope
import
logging
_logger
=
logging
.
getLogger
(
__name__
)
# 设置DashScope的API地址
dashscope
.
base_http_api_url
=
'https://dashscope.aliyuncs.com/api/v1'
class
AIImageEditService
:
"""AI图片编辑服务 - 使用阿里云百炼的qwen-image-edit模型"""
def
__init__
(
self
,
api_key
=
'sk-e41914f0d9c94035a5ae1322e9a61fb1'
):
self
.
api_key
=
api_key
self
.
model
=
"qwen-image-edit"
def
edit_image_remove_text
(
self
,
image_base64
,
text_to_remove
=
"AGN UCLINK LOGISITICS LTD"
):
"""
使用AI模型移除图片中的指定文字
:param image_base64: 图片的base64编码
:param text_to_remove: 要移除的文字
:return: 处理后的图片base64编码,失败返回None
"""
try
:
# 构建消息
messages
=
[
{
"role"
:
"user"
,
"content"
:
[
{
"image"
:
f
"data:image/png;base64,{image_base64}"
},
{
"text"
:
f
"将图片中的{text_to_remove}这一段文字抹去,保持背景完全一致"
}
]
}
]
# 调用AI模型
response
=
MultiModalConversation
.
call
(
api_key
=
self
.
api_key
,
model
=
self
.
model
,
messages
=
messages
,
stream
=
False
,
watermark
=
False
,
negative_prompt
=
" "
)
if
response
.
status_code
==
200
:
# 获取处理后图片的URL
image_url
=
response
.
output
.
choices
[
0
]
.
message
.
content
[
0
][
'image'
]
_logger
.
info
(
f
"AI图片编辑成功,图片URL: {image_url}"
)
# 下载图片并转换为base64
edited_image_base64
=
self
.
download_and_convert_to_base64
(
image_url
)
return
edited_image_base64
else
:
_logger
.
error
(
f
"AI图片编辑失败,HTTP返回码:{response.status_code}"
)
_logger
.
error
(
f
"错误码:{response.code}"
)
_logger
.
error
(
f
"错误信息:{response.message}"
)
return
None
except
Exception
as
e
:
_logger
.
error
(
f
"AI图片编辑异常: {str(e)}"
)
return
None
def
download_and_convert_to_base64
(
self
,
image_url
,
timeout
=
300
):
"""
下载图片并转换为base64
:param image_url: 图片URL
:param timeout: 超时时间
:return: base64编码的图片数据
"""
try
:
response
=
requests
.
get
(
image_url
,
stream
=
True
,
timeout
=
timeout
)
response
.
raise_for_status
()
# 将图片内容转换为base64
image_data
=
response
.
content
image_base64
=
base64
.
b64encode
(
image_data
)
.
decode
(
'utf-8'
)
_logger
.
info
(
"图片下载并转换为base64成功"
)
return
image_base64
except
requests
.
exceptions
.
RequestException
as
e
:
_logger
.
error
(
f
"图片下载失败: {str(e)}"
)
return
None
ccs_base/wizard/batch_get_pod_info_wizard.py
浏览文件 @
5049b46c
...
...
@@ -10,6 +10,7 @@ import time
import
requests
from
odoo
import
models
,
fields
,
_
from
odoo.exceptions
import
ValidationError
from
.ai_image_edit_service
import
AIImageEditService
_logger
=
logging
.
getLogger
(
__name__
)
...
...
@@ -55,6 +56,177 @@ class BatchGetPodInfoWizard(models.TransientModel):
string
=
'Show Error Message'
,
help
=
'Show error message'
)
# PDF相关字段
pdf_file
=
fields
.
Binary
(
string
=
'PDF文件'
,
help
=
'涂抹后的所有pdf文件合并为一个pdf文件'
)
pdf_filename
=
fields
.
Char
(
string
=
'PDF文件名称'
)
processed_files_data
=
fields
.
Text
(
string
=
'已处理的文件数据'
,
help
=
'存储已处理的文件信息(JSON格式)'
)
def
_cleanup_temp_attachments
(
self
):
"""
清理与当前向导相关的临时附件
"""
try
:
attachments
=
self
.
env
[
'ir.attachment'
]
.
search
([
(
'res_model'
,
'='
,
self
.
_name
),
(
'res_id'
,
'='
,
self
.
id
),
(
'name'
,
'like'
,
'temp_pod_
%
'
)
])
if
attachments
:
attachment_names
=
[
att
.
name
for
att
in
attachments
]
attachments
.
unlink
()
_logger
.
info
(
f
"已清理临时附件: {attachment_names}"
)
except
Exception
as
e
:
_logger
.
error
(
f
"清理临时附件失败: {str(e)}"
)
def
_serialize_processed_files
(
self
,
processed_files
):
"""
将processed_files序列化为JSON字符串,文件数据存储到临时附件中
:param processed_files: 处理后的文件数组
:return: JSON字符串(只包含引用信息,不包含文件数据)
"""
# 清理旧的临时附件
self
.
_cleanup_temp_attachments
()
serialized_data
=
[]
for
file_info
in
processed_files
:
if
not
file_info
.
get
(
'bl'
):
continue
bl
=
file_info
[
'bl'
]
file_data
=
file_info
.
get
(
'file_data'
,
''
)
file_name
=
file_info
.
get
(
'file_name'
,
f
"{bl.bl_no}.pdf"
)
# 将文件数据存储到临时附件中
attachment_id
=
None
if
file_data
:
try
:
attachment
=
self
.
env
[
'ir.attachment'
]
.
create
({
'name'
:
f
"temp_pod_{bl.bl_no}_{int(time.time())}.pdf"
,
'datas'
:
file_data
,
'type'
:
'binary'
,
'res_model'
:
self
.
_name
,
'res_id'
:
self
.
id
,
'delete_old'
:
True
,
})
attachment_id
=
attachment
.
id
_logger
.
info
(
f
"已创建临时附件存储文件: {attachment.name}, ID: {attachment_id}"
)
except
Exception
as
e
:
_logger
.
error
(
f
"创建临时附件失败: {str(e)}"
)
data
=
{
'bl_id'
:
bl
.
id
,
'bl_no'
:
bl
.
bl_no
,
'file_name'
:
file_name
,
'attachment_id'
:
attachment_id
,
# 存储附件ID而不是文件数据
}
# OCR文本数据量小,可以直接存储
if
'ocr_texts'
in
file_info
:
data
[
'ocr_texts'
]
=
file_info
[
'ocr_texts'
]
serialized_data
.
append
(
data
)
return
json
.
dumps
(
serialized_data
,
ensure_ascii
=
False
)
def
_deserialize_processed_files
(
self
,
json_data
):
"""
将JSON字符串反序列化为processed_files(从附件中读取文件数据)
:param json_data: JSON字符串
:return: 处理后的文件数组
"""
if
not
json_data
:
return
[]
try
:
serialized_data
=
json
.
loads
(
json_data
)
processed_files
=
[]
for
data
in
serialized_data
:
bl_id
=
data
.
get
(
'bl_id'
)
attachment_id
=
data
.
get
(
'attachment_id'
)
if
bl_id
:
bl
=
self
.
env
[
'cc.bl'
]
.
browse
(
bl_id
)
if
bl
.
exists
():
# 从附件中读取文件数据
file_data
=
''
if
attachment_id
:
try
:
attachment
=
self
.
env
[
'ir.attachment'
]
.
browse
(
attachment_id
)
if
attachment
.
exists
():
file_data
=
attachment
.
datas
_logger
.
info
(
f
"从附件读取文件: {attachment.name}, ID: {attachment_id}"
)
else
:
_logger
.
warning
(
f
"附件不存在: {attachment_id}"
)
except
Exception
as
e
:
_logger
.
error
(
f
"读取附件失败: {str(e)}"
)
file_info
=
{
'bl'
:
bl
,
'bl_no'
:
data
.
get
(
'bl_no'
,
''
),
'file_name'
:
data
.
get
(
'file_name'
,
''
),
'file_data'
:
file_data
,
}
# 如果有OCR文本,也恢复
if
'ocr_texts'
in
data
:
file_info
[
'ocr_texts'
]
=
data
[
'ocr_texts'
]
processed_files
.
append
(
file_info
)
return
processed_files
except
Exception
as
e
:
_logger
.
error
(
f
"反序列化processed_files失败: {str(e)}"
)
return
[]
def
action_preview
(
self
):
"""
预览操作:获取PDF、处理涂抹、合并PDF并显示
"""
try
:
bl_objs
=
self
.
get_order
()
_logger
.
info
(
f
"开始预览操作,提单数量: {len(bl_objs)}"
)
# 调用接口获取提单pdf文件
pdf_file_arr
=
self
.
_get_pdf_file_arr
()
# 处理PDF文件,匹配提单对象
processed_files
=
self
.
_match_bl_by_file_name
(
pdf_file_arr
)
# 把没有匹配到文件的进行提示
error_bl
=
[]
matched_bl_ids
=
[
f
[
'bl'
]
.
id
for
f
in
processed_files
if
f
.
get
(
'bl'
)]
for
bl
in
bl_objs
:
if
bl
.
id
not
in
matched_bl_ids
:
error_bl
.
append
(
bl
)
if
error_bl
:
logging
.
info
(
'
%
s个提单无法找到release note文件'
%
len
(
error_bl
))
if
not
self
.
_context
.
get
(
'is_skip_raise_error'
):
self
.
show_error_message
=
_
(
'
%
s bill of loading cannot find release note file'
)
%
(
', '
.
join
([
bl
.
bl_no
for
bl
in
error_bl
]))
# 如果启用了涂抹文字,进行处理
if
self
.
remove_specified_text
and
processed_files
:
processed_files
=
self
.
_remove_specified_text
(
processed_files
,
debug_mode
=
False
)
# 合并PDF并保存到pdf_file字段
self
.
_merge_pdf_files
(
processed_files
)
# 序列化并存储处理后的文件数据
if
processed_files
:
self
.
processed_files_data
=
self
.
_serialize_processed_files
(
processed_files
)
_logger
.
info
(
f
"预览完成,已处理 {len(processed_files)} 个文件"
)
else
:
self
.
processed_files_data
=
''
# 返回表单视图
return
{
'type'
:
'ir.actions.act_window'
,
'res_model'
:
'batch.get.pod.info.wizard'
,
'view_mode'
:
'form'
,
'res_id'
:
self
.
id
,
'target'
:
'new'
,
'context'
:
{
'active_id'
:
bl_objs
.
ids
}
}
except
Exception
as
e
:
_logger
.
error
(
f
"预览操作失败: {str(e)}"
)
self
.
show_error_message
=
_
(
'预览操作失败:
%
s'
)
%
str
(
e
)
return
{
'type'
:
'ir.actions.act_window'
,
'res_model'
:
'batch.get.pod.info.wizard'
,
'view_mode'
:
'form'
,
'res_id'
:
self
.
id
,
'target'
:
'new'
,
'context'
:
{
'active_id'
:
self
.
_context
.
get
(
'active_id'
,
[])}
}
def
confirm
(
self
):
"""
...
...
@@ -65,42 +237,69 @@ class BatchGetPodInfoWizard(models.TransientModel):
self
.
show_error_message
=
False
bl_objs
=
self
.
get_order
()
_logger
.
info
(
f
"
%
s提单开始执行批量获取POD信息操作"
%
len
(
bl_objs
))
# 调用接口获取提单pdf文件
pdf_file_arr
=
self
.
_get_pdf_file_arr
()
# 处理PDF文件,匹配提单对象
processed_files
=
self
.
_match_bl_by_file_name
(
pdf_file_arr
)
# 把没有匹配到文件的进行提示
error_bl
=
[]
matched_bl_ids
=
[
f
[
'bl'
]
.
id
for
f
in
processed_files
if
f
.
get
(
'bl'
)]
for
bl
in
bl_objs
:
if
bl
.
id
not
in
matched_bl_ids
:
error_bl
.
append
(
bl
)
if
error_bl
:
logging
.
info
(
'
%
s个提单无法找到release note文件'
%
len
(
error_bl
))
# 英文提示
if
not
self
.
_context
.
get
(
'is_skip_raise_error'
):
self
.
show_error_message
=
_
(
'
%
s bill of loading cannot find release note file'
)
%
(
', '
.
join
([
bl
.
bl_no
for
bl
in
error_bl
]))
# raise ValidationError(_('%s bill of loading cannot find release note file') % (
# ', '.join([bl.bl_no for bl in error_bl]))) # xx提单无法找到release note文件
if
self
.
remove_specified_text
:
# 临时启用调试模式,查看删除位置
processed_files
=
self
.
_remove_specified_text
(
processed_files
,
debug_mode
=
False
)
# 用于测试的:保存处理后的PDF并返回下载链接
# if processed_files and processed_files[0].get('file_data'):
# return self._save_and_return_download_link(processed_files[0])
# 优先使用已处理的文件数据(预览时已处理)
processed_files
=
None
if
self
.
processed_files_data
:
processed_files
=
self
.
_deserialize_processed_files
(
self
.
processed_files_data
)
_logger
.
info
(
f
"使用已处理的文件数据,共 {len(processed_files)} 个文件"
)
# 如果没有已处理的数据,则执行处理流程
if
not
processed_files
:
# 调用接口获取提单pdf文件
pdf_file_arr
=
self
.
_get_pdf_file_arr
()
# 处理PDF文件,匹配提单对象
processed_files
=
self
.
_match_bl_by_file_name
(
pdf_file_arr
)
# 把没有匹配到文件的进行提示
error_bl
=
[]
matched_bl_ids
=
[
f
[
'bl'
]
.
id
for
f
in
processed_files
if
f
.
get
(
'bl'
)]
for
bl
in
bl_objs
:
if
bl
.
id
not
in
matched_bl_ids
:
error_bl
.
append
(
bl
)
if
error_bl
:
logging
.
info
(
'
%
s个提单无法找到release note文件'
%
len
(
error_bl
))
# 英文提示
if
not
self
.
_context
.
get
(
'is_skip_raise_error'
):
self
.
show_error_message
=
_
(
'
%
s bill of loading cannot find release note file'
)
%
(
', '
.
join
([
bl
.
bl_no
for
bl
in
error_bl
]))
# 如果启用了涂抹文字,进行处理
if
self
.
remove_specified_text
and
processed_files
:
processed_files
=
self
.
_remove_specified_text
(
processed_files
,
debug_mode
=
False
)
# 合并PDF并保存到pdf_file字段
self
.
_merge_pdf_files
(
processed_files
)
# 如果有处理后的文件,序列化存储
if
processed_files
:
self
.
processed_files_data
=
self
.
_serialize_processed_files
(
processed_files
)
# 跳转到本向导的form视图(显示合并后的PDF)
return
{
'type'
:
'ir.actions.act_window'
,
'res_model'
:
'batch.get.pod.info.wizard'
,
'view_mode'
:
'form'
,
'res_id'
:
self
.
id
,
'target'
:
'new'
,
'context'
:
{
'active_id'
:
bl_objs
.
ids
,}
}
# 回写到附件信息
if
processed_files
:
logging
.
info
(
f
"回写PDF文件到清关文件,共 {len(processed_files)} 个文件"
)
# 回写PDF文件到清关文件
self
.
_write_pdf_file
(
processed_files
)
# 再同步和回写
if
self
.
sync_last_mile_pod
and
processed_files
:
logging
.
info
(
f
"同步和回写尾程POD信息,共 {len(processed_files)} 个文件"
)
self
.
_sync_last_mile_pod
(
processed_files
)
# 同步推送匹配节点
if
self
.
sync_match_node
and
processed_files
:
logging
.
info
(
f
"同步推送匹配节点,共 {len(processed_files)} 个文件"
)
self
.
get_date_sync_match_node
(
processed_files
)
# 清理临时附件
self
.
_cleanup_temp_attachments
()
end_time
=
time
.
time
()
_logger
.
info
(
f
"批量获取POD信息操作完成,耗时: {end_time - start_time}秒"
)
if
self
.
show_error_message
and
not
self
.
_context
.
get
(
'is_skip_raise_error'
):
...
...
@@ -194,6 +393,8 @@ class BatchGetPodInfoWizard(models.TransientModel):
Write PDF file to clearance files # 回写PDF文件到清关文件
:param processed_files: 处理后的文件数组
"""
logging
.
info
(
'processed_files:
%
s'
%
processed_files
)
logging
.
info
(
'processed_files type:
%
s'
%
type
(
processed_files
))
for
file_info
in
processed_files
:
if
not
file_info
[
'bl'
]:
continue
...
...
@@ -221,6 +422,74 @@ class BatchGetPodInfoWizard(models.TransientModel):
})
file_info
[
'clearance_file'
]
=
clearance_file
def
_merge_pdf_files
(
self
,
processed_files
):
"""
合并所有涂抹后的PDF文件为一个PDF并保存到pdf_file字段
:param processed_files: 处理后的文件数组
"""
import
fitz
# PyMuPDF
from
datetime
import
datetime
try
:
# 创建新的PDF文档用于合并
merged_pdf
=
fitz
.
open
()
bl_numbers
=
[]
# 遍历所有处理后的PDF文件
for
file_info
in
processed_files
:
if
not
file_info
.
get
(
'bl'
)
or
not
file_info
.
get
(
'file_data'
):
continue
bl
=
file_info
[
'bl'
]
file_data
=
file_info
[
'file_data'
]
bl_numbers
.
append
(
bl
.
bl_no
)
try
:
# 将base64数据转换为二进制
pdf_binary
=
base64
.
b64decode
(
file_data
)
# 打开PDF文档
source_pdf
=
fitz
.
open
(
stream
=
pdf_binary
,
filetype
=
"pdf"
)
# 将源PDF的所有页面插入到合并的PDF中
merged_pdf
.
insert_pdf
(
source_pdf
)
source_pdf
.
close
()
_logger
.
info
(
f
"已添加提单 {bl.bl_no} 的PDF到合并文档"
)
except
Exception
as
e
:
_logger
.
error
(
f
"合并提单 {bl.bl_no} 的PDF失败: {str(e)}"
)
continue
# 如果有页面,保存合并后的PDF
if
len
(
merged_pdf
)
>
0
:
# 保存到内存
output_buffer
=
io
.
BytesIO
()
merged_pdf
.
save
(
output_buffer
,
garbage
=
4
,
deflate
=
True
,
clean
=
True
)
merged_pdf
.
close
()
# 转换为base64
merged_pdf_base64
=
base64
.
b64encode
(
output_buffer
.
getvalue
())
.
decode
(
'utf-8'
)
# 生成文件名(包含提单号和日期)
bl_numbers_str
=
'_'
.
join
(
bl_numbers
[:
5
])
# 最多显示5个提单号
if
len
(
bl_numbers
)
>
5
:
bl_numbers_str
+=
f
'_等{len(bl_numbers)}个'
timestamp
=
datetime
.
now
()
.
strftime
(
'
%
Y
%
m
%
d_
%
H
%
M
%
S'
)
pdf_filename
=
f
"合并POD文件_{bl_numbers_str}_{timestamp}.pdf"
# 保存到字段
self
.
write
({
'pdf_file'
:
merged_pdf_base64
,
'pdf_filename'
:
pdf_filename
})
_logger
.
info
(
f
"成功合并 {len(bl_numbers)} 个PDF文件,文件名: {pdf_filename}"
)
else
:
_logger
.
warning
(
"没有有效的PDF文件可以合并"
)
except
Exception
as
e
:
_logger
.
error
(
f
"合并PDF文件失败: {str(e)}"
)
def
_match_bl_by_file_name
(
self
,
pdf_file_arr
):
"""
Match BL by file name and return processed array # 根据文件名匹配提单并返回处理后的数组
...
...
@@ -266,14 +535,136 @@ class BatchGetPodInfoWizard(models.TransientModel):
clearance_file
.
action_sync
()
# 同步尾程POD
_logger
.
info
(
f
"Successfully synced POD for BL {bl.bl_no}"
)
def
_check_target_texts_exist
(
self
,
pdf_binary
,
bl_no
):
"""
检查PDF中是否还存在目标文字(使用OCR识别检查)
:param pdf_binary: PDF二进制数据
:param bl_no: 提单号(用于日志)
:return: (是否存在目标文字, 找到的文字列表)
"""
import
fitz
# PyMuPDF
import
pytesseract
import
numpy
as
np
from
PIL
import
Image
import
re
# 定义目标文字(与_find_target_texts一致)
TARGET_TEXTS
=
[
'AGN'
,
'UCLINK LOGISITICS LTD'
,
'UCLINK LOGISITICS'
,
'UCLINK'
,
'LOGISITICS'
,
'LOGISTICS'
,
'LTD'
,
'UCLINKLOGISITICSLTD'
]
EXCLUDE_TEXTS
=
[
'AIR EQK'
,
'ARN'
,
'EQK'
,
'AIR'
,
'Page 1 of 1'
,
'Page 2 of 2'
,
'Page 3 of 3'
,
'Page 4 of 4'
,
'Page 5 of 5'
]
try
:
# 设置Tesseract路径
self
.
_setup_tesseract_path
()
# 打开PDF文档
pdf_document
=
fitz
.
open
(
stream
=
pdf_binary
,
filetype
=
"pdf"
)
found_texts
=
[]
# 尝试导入OpenCV,如果失败则使用PIL替代
try
:
import
cv2
cv2_available
=
True
except
ImportError
:
cv2_available
=
False
# 遍历每一页
for
page_num
in
range
(
len
(
pdf_document
)):
page
=
pdf_document
[
page_num
]
# 首先尝试从PDF文本层提取(如果是文本型PDF)
page_text_pdf
=
page
.
get_text
()
.
upper
()
# 将页面转换为图像进行OCR识别
mat
=
fitz
.
Matrix
(
2.0
,
2.0
)
# 提高分辨率
pix
=
page
.
get_pixmap
(
matrix
=
mat
)
img_data
=
pix
.
tobytes
(
"png"
)
# 转换为PIL图像
if
cv2_available
:
nparr
=
np
.
frombuffer
(
img_data
,
np
.
uint8
)
img
=
cv2
.
imdecode
(
nparr
,
cv2
.
IMREAD_COLOR
)
pil_img
=
Image
.
fromarray
(
cv2
.
cvtColor
(
img
,
cv2
.
COLOR_BGR2RGB
))
else
:
pil_img
=
Image
.
open
(
io
.
BytesIO
(
img_data
))
if
pil_img
.
mode
!=
'RGB'
:
pil_img
=
pil_img
.
convert
(
'RGB'
)
# OCR识别
try
:
config
=
'--psm 6 --oem 1 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789.,- -c preserve_interword_spaces=1'
ocr_text
=
pytesseract
.
image_to_string
(
pil_img
,
config
=
config
,
lang
=
'eng'
)
.
upper
()
except
Exception
as
e
:
_logger
.
warning
(
f
"OCR识别失败,第{page_num + 1}页,使用PDF文本: {str(e)}"
)
ocr_text
=
page_text_pdf
# 合并PDF文本和OCR文本进行检查
combined_text
=
(
page_text_pdf
+
' '
+
ocr_text
)
.
upper
()
# 检查目标文字
for
target_text
in
TARGET_TEXTS
:
target_upper
=
target_text
.
upper
()
# 检查是否包含目标文字
is_match
=
False
if
target_text
==
'AGN'
:
# AGN使用精确匹配
if
re
.
search
(
r'\bAGN\b'
,
combined_text
):
is_match
=
True
elif
target_text
==
'LTD'
:
# LTD使用精确匹配,但要排除其他包含LTD的文字
if
re
.
search
(
r'\bLTD\b'
,
combined_text
)
and
'UCLINK'
in
combined_text
:
is_match
=
True
else
:
# 其他文字使用包含匹配
if
target_upper
in
combined_text
:
# 排除AIR、EQK、ARN等(需要这些词都不存在)
if
'AIR EQK'
not
in
combined_text
and
'ARN'
not
in
combined_text
:
is_match
=
True
# 如果匹配,检查是否在排除列表中
if
is_match
:
is_excluded
=
False
for
exclude_text
in
EXCLUDE_TEXTS
:
exclude_upper
=
exclude_text
.
upper
()
if
exclude_upper
in
combined_text
and
target_upper
in
combined_text
:
# 检查是否是页码
if
re
.
search
(
r'PAGE\s+\d+\s+OF\s+\d+'
,
combined_text
)
or
re
.
search
(
r'\d+\s*/\s*\d+'
,
combined_text
):
is_excluded
=
True
break
# 检查是否是AIR EQK等排除项
if
'AIR EQK'
in
combined_text
or
'ARN'
in
combined_text
:
is_excluded
=
True
break
if
not
is_excluded
:
found_texts
.
append
(
f
"第{page_num + 1}页: {target_text}"
)
break
# 找到就跳出,避免重复
pdf_document
.
close
()
if
found_texts
:
_logger
.
warning
(
f
"提单 {bl_no} 仍存在目标文字: {', '.join(found_texts)}"
)
return
True
,
found_texts
else
:
_logger
.
info
(
f
"提单 {bl_no} 未发现目标文字"
)
return
False
,
[]
except
Exception
as
e
:
_logger
.
error
(
f
"检查目标文字失败,提单号: {bl_no}, 错误: {str(e)}"
)
# 检查失败时,假设不存在(避免误报)
return
False
,
[]
def
_remove_specified_text
(
self
,
processed_files
,
debug_mode
=
False
):
"""
Remove specified text from PDF files using OCR recognition # 使用OCR识别涂抹指定文字
移除PDF中的指定文字:先用OCR处理,检查是否还存在,如果存在则用AI处理,再次检查
:param processed_files: 处理后的文件数组
:param debug_mode: 是否显示调试标记
:return: 处理后的文件数组(包含处理后的PDF数据)
"""
updated_files
=
[]
error_messages
=
[]
for
file_info
in
processed_files
:
if
not
file_info
[
'bl'
]:
...
...
@@ -283,7 +674,6 @@ class BatchGetPodInfoWizard(models.TransientModel):
file_data
=
file_info
[
'file_data'
]
processed_file_data
=
file_data
# 默认使用原始数据
# 使用OCR识别和删除指定文字
if
file_data
:
# 将base64数据转换为二进制
pdf_binary
=
base64
.
b64decode
(
file_data
)
...
...
@@ -292,21 +682,71 @@ class BatchGetPodInfoWizard(models.TransientModel):
if
'ocr_texts'
not
in
file_info
:
file_info
[
'ocr_texts'
]
=
self
.
_extract_text_from_pdf_with_ocr
(
pdf_binary
,
bl
.
bl_no
)
# 使用OCR方法处理PDF
processed_pdf
=
self
.
_process_pdf_with_ocr
(
pdf_data
=
pdf_binary
,
bl_no
=
bl
.
bl_no
,
debug_mode
=
debug_mode
)
if
processed_pdf
:
# 将处理后的PDF转换回base64
processed_file_data
=
base64
.
b64encode
(
processed_pdf
)
.
decode
(
'utf-8'
)
# 第一步:使用OCR方法处理PDF
_logger
.
info
(
f
"提单 {bl.bl_no} 开始OCR处理"
)
try
:
processed_pdf
=
self
.
_process_pdf_with_ocr
(
pdf_data
=
pdf_binary
,
bl_no
=
bl
.
bl_no
,
debug_mode
=
debug_mode
)
if
processed_pdf
:
processed_file_data
=
base64
.
b64encode
(
processed_pdf
)
.
decode
(
'utf-8'
)
# 第二步:检查是否还存在目标文字
pdf_for_check
=
base64
.
b64decode
(
processed_file_data
)
text_exists
,
found_texts
=
self
.
_check_target_texts_exist
(
pdf_for_check
,
bl
.
bl_no
)
if
text_exists
:
# 第三步:如果还存在,使用AI图片编辑处理
_logger
.
info
(
f
"提单 {bl.bl_no} OCR处理后仍存在目标文字,使用AI图片编辑处理"
)
try
:
ai_processed_pdf
=
self
.
_process_pdf_with_ai_image_edit
(
pdf_data
=
pdf_for_check
,
bl_no
=
bl
.
bl_no
)
if
ai_processed_pdf
:
processed_file_data
=
base64
.
b64encode
(
ai_processed_pdf
)
.
decode
(
'utf-8'
)
# 第四步:再次检查是否还存在目标文字
final_check_pdf
=
base64
.
b64decode
(
processed_file_data
)
text_still_exists
,
final_found_texts
=
self
.
_check_target_texts_exist
(
final_check_pdf
,
bl
.
bl_no
)
if
text_still_exists
:
# 第五步:如果仍然存在,记录错误信息
error_msg
=
f
"提单 {bl.bl_no} 处理后仍存在目标文字: {', '.join(final_found_texts)}"
_logger
.
error
(
error_msg
)
error_messages
.
append
(
error_msg
)
else
:
_logger
.
warning
(
f
"提单 {bl.bl_no} AI处理失败,保持OCR处理结果"
)
except
Exception
as
e
:
_logger
.
error
(
f
"提单 {bl.bl_no} AI处理异常: {str(e)}"
)
# AI处理失败,使用OCR结果,但需要检查
final_check_pdf
=
base64
.
b64decode
(
processed_file_data
)
text_still_exists
,
final_found_texts
=
self
.
_check_target_texts_exist
(
final_check_pdf
,
bl
.
bl_no
)
if
text_still_exists
:
error_msg
=
f
"提单 {bl.bl_no} OCR处理未完全清除文字,AI处理失败: {', '.join(final_found_texts)}"
error_messages
.
append
(
error_msg
)
else
:
_logger
.
info
(
f
"提单 {bl.bl_no} OCR处理成功,目标文字已清除"
)
else
:
_logger
.
warning
(
f
"提单 {bl.bl_no} OCR处理失败"
)
error_messages
.
append
(
f
"提单 {bl.bl_no} OCR处理失败"
)
except
Exception
as
e
:
_logger
.
error
(
f
"提单 {bl.bl_no} OCR处理异常: {str(e)}"
)
error_messages
.
append
(
f
"提单 {bl.bl_no} OCR处理异常: {str(e)}"
)
# 更新文件信息,使用处理后的PDF数据
updated_file_info
=
file_info
.
copy
()
updated_file_info
[
'file_data'
]
=
processed_file_data
updated_files
.
append
(
updated_file_info
)
# 如果有错误信息,合并到show_error_message中
if
error_messages
:
existing_error
=
self
.
show_error_message
or
''
new_errors
=
'
\n
'
.
join
(
error_messages
)
self
.
show_error_message
=
existing_error
+
'
\n
'
+
new_errors
if
existing_error
else
new_errors
return
updated_files
def
_extract_text_from_pdf_with_ocr
(
self
,
pdf_binary
,
bl_no
):
...
...
@@ -368,6 +808,96 @@ class BatchGetPodInfoWizard(models.TransientModel):
pdf_document
.
close
()
return
page_texts
def
_process_pdf_with_ai_image_edit
(
self
,
pdf_data
,
bl_no
):
"""
使用AI图片编辑处理PDF:PDF转图片 -> AI抹除文字 -> 图片转回PDF
:param pdf_data: PDF二进制数据
:param bl_no: 提单号(用于日志)
:return: 处理后的PDF二进制数据
"""
import
fitz
# PyMuPDF
import
base64
from
PIL
import
Image
_logger
.
info
(
f
"开始使用AI图片编辑处理PDF,提单号: {bl_no}"
)
# 初始化AI服务
ai_service
=
AIImageEditService
()
# 打开PDF文档
pdf_document
=
fitz
.
open
(
stream
=
pdf_data
,
filetype
=
"pdf"
)
processed_pages
=
[]
# 遍历每一页
for
page_num
in
range
(
len
(
pdf_document
)):
page
=
pdf_document
[
page_num
]
_logger
.
info
(
f
"正在处理第{page_num + 1}页"
)
# 将页面转换为图像
mat
=
fitz
.
Matrix
(
2.0
,
2.0
)
# 提高分辨率
pix
=
page
.
get_pixmap
(
matrix
=
mat
)
img_data
=
pix
.
tobytes
(
"png"
)
# 转换为base64
img_base64
=
base64
.
b64encode
(
img_data
)
.
decode
(
'utf-8'
)
# 使用AI编辑图片,移除指定文字
edited_img_base64
=
ai_service
.
edit_image_remove_text
(
img_base64
,
text_to_remove
=
"AGN UCLINK LOGISITICS LTD"
)
if
edited_img_base64
:
# 解码base64图片数据
edited_img_data
=
base64
.
b64decode
(
edited_img_base64
)
# 保存处理后的图片数据
processed_pages
.
append
({
'img_data'
:
edited_img_data
,
'is_edited'
:
True
})
_logger
.
info
(
f
"第{page_num + 1}页AI处理成功"
)
else
:
_logger
.
warning
(
f
"第{page_num + 1}页AI处理失败,使用原始页面"
)
# 如果AI处理失败,使用原始图片
processed_pages
.
append
({
'img_data'
:
img_data
,
'is_edited'
:
False
})
# 创建新的PDF文档
output_doc
=
fitz
.
open
()
for
page_info
in
processed_pages
:
img_data
=
page_info
[
'img_data'
]
is_edited
=
page_info
[
'is_edited'
]
# 将图片加载到内存
img_bytes_io
=
io
.
BytesIO
(
img_data
)
img
=
Image
.
open
(
img_bytes_io
)
# 创建新页面
page
=
output_doc
.
new_page
(
width
=
img
.
width
,
height
=
img
.
height
)
# 将图片插入PDF页面
page
.
insert_image
(
fitz
.
Rect
(
0
,
0
,
img
.
width
,
img
.
height
),
stream
=
img_data
,
)
# 保存处理后的PDF
output_buffer
=
io
.
BytesIO
()
output_doc
.
save
(
output_buffer
,
garbage
=
4
,
deflate
=
True
)
output_doc
.
close
()
pdf_document
.
close
()
result_data
=
output_buffer
.
getvalue
()
_logger
.
info
(
f
"AI图片编辑PDF处理完成,提单号: {bl_no}"
)
return
result_data
def
_process_pdf_with_ocr
(
self
,
pdf_data
,
bl_no
,
debug_mode
=
False
):
"""
Process PDF with OCR recognition and text removal (完全按照HTML逻辑) # 使用OCR识别处理PDF并删除文字
...
...
ccs_base/wizard/batch_get_pod_info_wizard_views.xml
浏览文件 @
5049b46c
...
...
@@ -9,17 +9,17 @@
<field
name=
"arch"
type=
"xml"
>
<form
string=
"Batch Get POD Info"
>
<!-- 批量获取POD信息 -->
<sheet>
<!-- <group> -->
<group>
<group>
<field
name=
"sync_last_mile_pod"
widget=
"boolean_toggle"
/>
</group>
<group>
<field
name=
"remove_specified_text"
widget=
"boolean_toggle"
/>
</group>
<group>
<field
name=
"sync_match_node"
widget=
"boolean_toggle"
/>
</group>
<field
name=
"sync_last_mile_pod"
widget=
"boolean_toggle"
attrs=
"{'invisible': [('pdf_file', '=', False)]}"
/>
</group>
<group>
<field
name=
"remove_specified_text"
widget=
"boolean_toggle"
attrs=
"{'invisible': [('pdf_file', '!=', False)]}"
/>
</group>
<group>
<field
name=
"sync_match_node"
widget=
"boolean_toggle"
attrs=
"{'invisible': [('pdf_file', '=', False)]}"
/>
</group>
<!-- </group> -->
<div
class=
"alert alert-info"
role=
"alert"
>
<strong>
Description:
</strong>
<!-- 说明: -->
...
...
@@ -32,8 +32,14 @@
<div
class=
"alert alert-danger"
role=
"alert"
attrs=
"{'invisible': [('show_error_message', '=', False)]}"
>
<field
name=
"show_error_message"
/>
</div>
<div>
<field
name=
"pdf_file"
filename=
"pdf_filename"
widget=
"pdf_viewer"
readonly=
"1"
attrs=
"{'invisible': [('pdf_file', '=', False)]}"
/>
</div>
<footer>
<button
string=
"Confirm"
type=
"object"
name=
"confirm"
class=
"btn-primary"
/>
<!-- 预览按钮:处理PDF并显示合并后的文件 -->
<button
string=
"Preview"
type=
"object"
name=
"action_preview"
class=
"btn-primary"
attrs=
"{'invisible': [('pdf_file', '!=', False)]}"
/>
<!-- 确认按钮:使用已处理的文件数据进行回写和同步 -->
<button
string=
"Confirm"
type=
"object"
name=
"confirm"
class=
"btn-primary"
attrs=
"{'invisible': [('pdf_file', '=', False)]}"
/>
<button
string=
"Close"
special=
"cancel"
/>
</footer>
</sheet>
...
...
ccs_base/wizard/image-to-image.py
0 → 100644
浏览文件 @
5049b46c
import
json
import
os
from
dashscope
import
MultiModalConversation
import
dashscope
import
base64
import
requests
dashscope
.
base_http_api_url
=
'https://dashscope.aliyuncs.com/api/v1'
image_path
=
"./图片识别2.png"
def
download_image
(
image_url
,
save_path
=
'output.png'
):
try
:
response
=
requests
.
get
(
image_url
,
stream
=
True
,
timeout
=
300
)
# 设置超时
response
.
raise_for_status
()
# 如果HTTP状态码不是200,则引发异常
with
open
(
save_path
,
'wb'
)
as
f
:
for
chunk
in
response
.
iter_content
(
chunk_size
=
8192
):
f
.
write
(
chunk
)
print
(
f
"图像已成功下载到: {save_path}"
)
except
requests
.
exceptions
.
RequestException
as
e
:
print
(
f
"图像下载失败: {e}"
)
with
open
(
image_path
,
"rb"
)
as
image_file
:
image_base64
=
base64
.
b64encode
(
image_file
.
read
())
.
decode
(
'utf-8'
)
# 模型支持输入1-3张图片
messages
=
[
{
"role"
:
"user"
,
"content"
:
[
{
"image"
:
f
"data:image/png;base64,{image_base64}"
},
{
"text"
:
"将图片中的AGN UCLINK LOGISITICS LTD这一段文字抹去"
}
]
}
]
# 新加坡和北京地域的API Key不同。获取API Key:https://help.aliyun.com/zh/model-studio/get-api-key
# 若没有配置环境变量,请用百炼 API Key 将下行替换为:api_key="sk-xxx"
# api_key = os.getenv("DASHSCOPE_API_KEY")
# 模型仅支持单轮对话,复用了多轮对话的接口
response
=
MultiModalConversation
.
call
(
api_key
=
'sk-e41914f0d9c94035a5ae1322e9a61fb1'
,
model
=
"qwen-image-edit"
,
messages
=
messages
,
stream
=
False
,
watermark
=
False
,
negative_prompt
=
" "
)
if
response
.
status_code
==
200
:
# 如需查看完整响应,请取消下行注释
# print(json.dumps(response, ensure_ascii=False))
print
(
"输出图像的URL:"
,
response
.
output
.
choices
[
0
]
.
message
.
content
[
0
][
'image'
])
image_url
=
response
.
output
.
choices
[
0
]
.
message
.
content
[
0
][
'image'
]
download_image
(
image_url
,
save_path
=
'处理图片.png'
)
else
:
print
(
f
"HTTP返回码:{response.status_code}"
)
print
(
f
"错误码:{response.code}"
)
print
(
f
"错误信息:{response.message}"
)
print
(
"请参考文档:https://help.aliyun.com/zh/model-studio/developer-reference/error-code"
)
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论