Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
H
hh_ccs
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
贺阳
hh_ccs
Commits
264bf0fd
提交
264bf0fd
authored
10月 30, 2025
作者:
贺阳
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Revert "ai图片识别,但是不是很清晰"
This reverts commit
a1c9cf6b
.
上级
a1c9cf6b
显示空白字符变更
内嵌
并排
正在显示
3 个修改的文件
包含
51 行增加
和
138 行删除
+51
-138
ai_image_edit_service.py
ccs_base/wizard/ai_image_edit_service.py
+27
-34
batch_get_pod_info_wizard.py
ccs_base/wizard/batch_get_pod_info_wizard.py
+24
-30
image-to-image2.py
ccs_base/wizard/image-to-image2.py
+0
-74
没有找到文件。
ccs_base/wizard/ai_image_edit_service.py
浏览文件 @
264bf0fd
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
import
base64
import
base64
import
requests
import
requests
from
dashscope
import
ImageSynthesis
from
dashscope
import
MultiModalConversation
import
dashscope
import
dashscope
import
logging
import
logging
from
http
import
HTTPStatus
_logger
=
logging
.
getLogger
(
__name__
)
_logger
=
logging
.
getLogger
(
__name__
)
# 设置DashScope的API地址
(以下为北京地域url,若使用新加坡地域的模型,需将url替换为:https://dashscope-intl.aliyuncs.com/api/v1)
# 设置DashScope的API地址
dashscope
.
base_http_api_url
=
'https://dashscope.aliyuncs.com/api/v1'
dashscope
.
base_http_api_url
=
'https://dashscope.aliyuncs.com/api/v1'
class
AIImageEditService
:
class
AIImageEditService
:
"""AI图片编辑服务 - 使用阿里云百炼的
wan2.5-i2i-preview
模型"""
"""AI图片编辑服务 - 使用阿里云百炼的
qwen-image-edit
模型"""
def
__init__
(
self
,
api_key
=
'sk-e41914f0d9c94035a5ae1322e9a61fb1'
):
def
__init__
(
self
,
api_key
=
'sk-e41914f0d9c94035a5ae1322e9a61fb1'
):
self
.
api_key
=
api_key
self
.
api_key
=
api_key
self
.
model
=
"
wan2.5-i2i-preview
"
self
.
model
=
"
qwen-image-edit
"
def
edit_image_remove_text
(
self
,
image_base64
,
text_to_remove
=
"AGN UCLINK LOGISITICS LTD"
):
def
edit_image_remove_text
(
self
,
image_base64
,
text_to_remove
=
"AGN UCLINK LOGISITICS LTD"
):
"""
"""
使用AI模型移除图片中的指定文字
使用AI模型移除图片中的指定文字
:param image_base64: 图片的base64编码
(不包含data:image/png;base64,前缀)
:param image_base64: 图片的base64编码
:param text_to_remove: 要移除的文字
:param text_to_remove: 要移除的文字
:return: 处理后的图片base64编码,失败返回None
:return: 处理后的图片base64编码,失败返回None
"""
"""
try
:
try
:
# 构建图片URL(使用Base64编码格式:data:{MIME_type};base64,{base64_data})
# 构建消息
image_url
=
f
"data:image/png;base64,{image_base64}"
messages
=
[
{
# 构建提示词(与image-to-image2.py中的格式保持一致)
"role"
:
"user"
,
# 如果text_to_remove包含"AGN"和"UCLINK LOGISITICS LTD",则拆分为两段
"content"
:
[
if
"AGN"
in
text_to_remove
and
"UCLINK"
in
text_to_remove
:
{
"image"
:
f
"data:image/png;base64,{image_base64}"
},
prompt
=
"将图片中的 AGN 跟 UCLINK LOGISITICS LTD 这两段文字抹去,确保其他的内容都保留,包括水印等"
{
"text"
:
f
"将图片中的{text_to_remove}这一段文字抹去,保持背景完全一致"
}
else
:
]
prompt
=
f
"将图片中的 {text_to_remove} 这段文字抹去,确保其他的内容都保留,包括水印等"
}
]
# 调用AI模型
# 调用AI模型
_logger
.
info
(
f
"开始调用AI图片编辑服务,模型: {self.model}"
)
response
=
MultiModalConversation
.
call
(
rsp
=
ImageSynthesis
.
call
(
api_key
=
self
.
api_key
,
api_key
=
self
.
api_key
,
model
=
self
.
model
,
model
=
self
.
model
,
prompt
=
prompt
,
messages
=
messages
,
images
=
[
image_url
],
stream
=
False
,
negative_prompt
=
""
,
n
=
1
,
watermark
=
False
,
watermark
=
False
,
seed
=
12345
negative_prompt
=
" "
)
)
if
r
sp
.
status_code
==
HTTPStatus
.
OK
:
if
r
esponse
.
status_code
==
200
:
# 获取处理后图片的URL
# 获取处理后图片的URL
if
rsp
.
output
and
rsp
.
output
.
results
and
len
(
rsp
.
output
.
results
)
>
0
:
image_url
=
response
.
output
.
choices
[
0
]
.
message
.
content
[
0
][
'image'
]
image_url_result
=
rsp
.
output
.
results
[
0
]
.
url
_logger
.
info
(
f
"AI图片编辑成功,图片URL: {image_url}"
)
_logger
.
info
(
f
"AI图片编辑成功,图片URL: {image_url_result}"
)
# 下载图片并转换为base64
# 下载图片并转换为base64
edited_image_base64
=
self
.
download_and_convert_to_base64
(
image_url_result
)
edited_image_base64
=
self
.
download_and_convert_to_base64
(
image_url
)
return
edited_image_base64
return
edited_image_base64
else
:
else
:
_logger
.
error
(
f
"AI图片编辑失败,返回结果为空"
)
_logger
.
error
(
f
"AI图片编辑失败,HTTP返回码:{response.status_code}"
)
return
None
_logger
.
error
(
f
"错误码:{response.code}"
)
else
:
_logger
.
error
(
f
"错误信息:{response.message}"
)
_logger
.
error
(
f
"AI图片编辑失败,HTTP返回码:{rsp.status_code}"
)
_logger
.
error
(
f
"错误码:{rsp.code}"
)
_logger
.
error
(
f
"错误信息:{rsp.message}"
)
return
None
return
None
except
Exception
as
e
:
except
Exception
as
e
:
...
@@ -77,7 +70,7 @@ class AIImageEditService:
...
@@ -77,7 +70,7 @@ class AIImageEditService:
下载图片并转换为base64
下载图片并转换为base64
:param image_url: 图片URL
:param image_url: 图片URL
:param timeout: 超时时间
:param timeout: 超时时间
:return: base64编码的图片数据
(不包含data:image/png;base64,前缀)
:return: base64编码的图片数据
"""
"""
try
:
try
:
response
=
requests
.
get
(
image_url
,
stream
=
True
,
timeout
=
timeout
)
response
=
requests
.
get
(
image_url
,
stream
=
True
,
timeout
=
timeout
)
...
...
ccs_base/wizard/batch_get_pod_info_wizard.py
浏览文件 @
264bf0fd
...
@@ -658,7 +658,7 @@ class BatchGetPodInfoWizard(models.TransientModel):
...
@@ -658,7 +658,7 @@ class BatchGetPodInfoWizard(models.TransientModel):
def
_remove_specified_text
(
self
,
processed_files
,
debug_mode
=
False
):
def
_remove_specified_text
(
self
,
processed_files
,
debug_mode
=
False
):
"""
"""
移除PDF中的指定文字:先用
AI图片编辑处理,检查是否还存在,如果存在则用OCR
处理,再次检查
移除PDF中的指定文字:先用
OCR处理,检查是否还存在,如果存在则用AI
处理,再次检查
:param processed_files: 处理后的文件数组
:param processed_files: 处理后的文件数组
:param debug_mode: 是否显示调试标记
:param debug_mode: 是否显示调试标记
:return: 处理后的文件数组(包含处理后的PDF数据)
:return: 处理后的文件数组(包含处理后的PDF数据)
...
@@ -678,35 +678,35 @@ class BatchGetPodInfoWizard(models.TransientModel):
...
@@ -678,35 +678,35 @@ class BatchGetPodInfoWizard(models.TransientModel):
# 将base64数据转换为二进制
# 将base64数据转换为二进制
pdf_binary
=
base64
.
b64decode
(
file_data
)
pdf_binary
=
base64
.
b64decode
(
file_data
)
# 先提取文本用于后续同步节点功能
(从原始PDF提取)
# 先提取文本用于后续同步节点功能
if
'ocr_texts'
not
in
file_info
:
if
'ocr_texts'
not
in
file_info
:
file_info
[
'ocr_texts'
]
=
self
.
_extract_text_from_pdf_with_ocr
(
pdf_binary
,
bl
.
bl_no
)
file_info
[
'ocr_texts'
]
=
self
.
_extract_text_from_pdf_with_ocr
(
pdf_binary
,
bl
.
bl_no
)
# 第一步:使用
AI图片编辑
处理PDF
# 第一步:使用
OCR方法
处理PDF
_logger
.
info
(
f
"提单 {bl.bl_no} 开始
AI图片编辑
处理"
)
_logger
.
info
(
f
"提单 {bl.bl_no} 开始
OCR
处理"
)
try
:
try
:
ai_processed_pdf
=
self
.
_process_pdf_with_ai_image_edit
(
processed_pdf
=
self
.
_process_pdf_with_ocr
(
pdf_data
=
pdf_binary
,
pdf_data
=
pdf_binary
,
bl_no
=
bl
.
bl_no
bl_no
=
bl
.
bl_no
,
debug_mode
=
debug_mode
)
)
if
ai_
processed_pdf
:
if
processed_pdf
:
processed_file_data
=
base64
.
b64encode
(
ai_
processed_pdf
)
.
decode
(
'utf-8'
)
processed_file_data
=
base64
.
b64encode
(
processed_pdf
)
.
decode
(
'utf-8'
)
# 第二步:检查是否还存在目标文字
# 第二步:检查是否还存在目标文字
pdf_for_check
=
base64
.
b64decode
(
processed_file_data
)
pdf_for_check
=
base64
.
b64decode
(
processed_file_data
)
text_exists
,
found_texts
=
self
.
_check_target_texts_exist
(
pdf_for_check
,
bl
.
bl_no
)
text_exists
,
found_texts
=
self
.
_check_target_texts_exist
(
pdf_for_check
,
bl
.
bl_no
)
if
text_exists
:
if
text_exists
:
# 第三步:如果还存在,使用
OCR方法处理PDF
# 第三步:如果还存在,使用
AI图片编辑处理
_logger
.
info
(
f
"提单 {bl.bl_no}
AI处理后仍存在目标文字,使用OCR
处理"
)
_logger
.
info
(
f
"提单 {bl.bl_no}
OCR处理后仍存在目标文字,使用AI图片编辑
处理"
)
try
:
try
:
ocr_processed_pdf
=
self
.
_process_pdf_with_ocr
(
ai_processed_pdf
=
self
.
_process_pdf_with_ai_image_edit
(
pdf_data
=
pdf_for_check
,
pdf_data
=
pdf_for_check
,
bl_no
=
bl
.
bl_no
,
bl_no
=
bl
.
bl_no
debug_mode
=
debug_mode
)
)
if
ocr
_processed_pdf
:
if
ai
_processed_pdf
:
processed_file_data
=
base64
.
b64encode
(
ocr
_processed_pdf
)
.
decode
(
'utf-8'
)
processed_file_data
=
base64
.
b64encode
(
ai
_processed_pdf
)
.
decode
(
'utf-8'
)
# 第四步:再次检查是否还存在目标文字
# 第四步:再次检查是否还存在目标文字
final_check_pdf
=
base64
.
b64decode
(
processed_file_data
)
final_check_pdf
=
base64
.
b64decode
(
processed_file_data
)
...
@@ -718,29 +718,23 @@ class BatchGetPodInfoWizard(models.TransientModel):
...
@@ -718,29 +718,23 @@ class BatchGetPodInfoWizard(models.TransientModel):
_logger
.
error
(
error_msg
)
_logger
.
error
(
error_msg
)
error_messages
.
append
(
error_msg
)
error_messages
.
append
(
error_msg
)
else
:
else
:
_logger
.
warning
(
f
"提单 {bl.bl_no} OCR处理失败,保持AI处理结果"
)
_logger
.
warning
(
f
"提单 {bl.bl_no} AI处理失败,保持OCR处理结果"
)
# OCR处理失败,检查AI处理后的结果
final_check_pdf
=
base64
.
b64decode
(
processed_file_data
)
text_still_exists
,
final_found_texts
=
self
.
_check_target_texts_exist
(
final_check_pdf
,
bl
.
bl_no
)
if
text_still_exists
:
error_msg
=
f
"提单 {bl.bl_no} AI处理未完全清除文字,OCR处理失败: {', '.join(final_found_texts)}"
error_messages
.
append
(
error_msg
)
except
Exception
as
e
:
except
Exception
as
e
:
_logger
.
error
(
f
"提单 {bl.bl_no}
OCR
处理异常: {str(e)}"
)
_logger
.
error
(
f
"提单 {bl.bl_no}
AI
处理异常: {str(e)}"
)
#
OCR处理异常,检查AI处理后的结果
#
AI处理失败,使用OCR结果,但需要检查
final_check_pdf
=
base64
.
b64decode
(
processed_file_data
)
final_check_pdf
=
base64
.
b64decode
(
processed_file_data
)
text_still_exists
,
final_found_texts
=
self
.
_check_target_texts_exist
(
final_check_pdf
,
bl
.
bl_no
)
text_still_exists
,
final_found_texts
=
self
.
_check_target_texts_exist
(
final_check_pdf
,
bl
.
bl_no
)
if
text_still_exists
:
if
text_still_exists
:
error_msg
=
f
"提单 {bl.bl_no}
AI处理未完全清除文字,OCR处理异常
: {', '.join(final_found_texts)}"
error_msg
=
f
"提单 {bl.bl_no}
OCR处理未完全清除文字,AI处理失败
: {', '.join(final_found_texts)}"
error_messages
.
append
(
error_msg
)
error_messages
.
append
(
error_msg
)
else
:
else
:
_logger
.
info
(
f
"提单 {bl.bl_no}
AI
处理成功,目标文字已清除"
)
_logger
.
info
(
f
"提单 {bl.bl_no}
OCR
处理成功,目标文字已清除"
)
else
:
else
:
_logger
.
warning
(
f
"提单 {bl.bl_no}
AI
处理失败"
)
_logger
.
warning
(
f
"提单 {bl.bl_no}
OCR
处理失败"
)
error_messages
.
append
(
f
"提单 {bl.bl_no}
AI
处理失败"
)
error_messages
.
append
(
f
"提单 {bl.bl_no}
OCR
处理失败"
)
except
Exception
as
e
:
except
Exception
as
e
:
_logger
.
error
(
f
"提单 {bl.bl_no}
AI
处理异常: {str(e)}"
)
_logger
.
error
(
f
"提单 {bl.bl_no}
OCR
处理异常: {str(e)}"
)
error_messages
.
append
(
f
"提单 {bl.bl_no}
AI
处理异常: {str(e)}"
)
error_messages
.
append
(
f
"提单 {bl.bl_no}
OCR
处理异常: {str(e)}"
)
# 更新文件信息,使用处理后的PDF数据
# 更新文件信息,使用处理后的PDF数据
updated_file_info
=
file_info
.
copy
()
updated_file_info
=
file_info
.
copy
()
...
...
ccs_base/wizard/image-to-image2.py
deleted
100644 → 0
浏览文件 @
a1c9cf6b
import
base64
import
mimetypes
from
http
import
HTTPStatus
from
urllib.parse
import
urlparse
,
unquote
from
pathlib
import
PurePosixPath
import
dashscope
import
requests
from
dashscope
import
ImageSynthesis
import
os
# 以下为北京地域url,若使用新加坡地域的模型,需将url替换为:https://dashscope-intl.aliyuncs.com/api/v1
dashscope
.
base_http_api_url
=
'https://dashscope.aliyuncs.com/api/v1'
# 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx"
# 新加坡和北京地域的API Key不同。获取API Key:https://help.aliyun.com/zh/model-studio/get-api-key
api_key
=
'sk-e41914f0d9c94035a5ae1322e9a61fb1'
# --- 输入图片:使用 Base64 编码 ---
# base64编码格式为 data:{MIME_type};base64,{base64_data}
def
encode_file
(
file_path
):
mime_type
,
_
=
mimetypes
.
guess_type
(
file_path
)
if
not
mime_type
or
not
mime_type
.
startswith
(
"image/"
):
raise
ValueError
(
"不支持或无法识别的图像格式"
)
with
open
(
file_path
,
"rb"
)
as
image_file
:
encoded_string
=
base64
.
b64encode
(
image_file
.
read
())
.
decode
(
'utf-8'
)
return
f
"data:{mime_type};base64,{encoded_string}"
"""
图像输入方式说明:
以下提供了三种图片输入方式,三选一即可
1. 使用公网URL - 适合已有公开可访问的图片
2. 使用本地文件 - 适合本地开发测试
3. 使用Base64编码 - 适合私有图片或需要加密传输的场景
"""
# 【方式一】使用公网图片 URL
# image_url_1 = "https://img.alicdn.com/imgextra/i4/O1CN01TlDlJe1LR9zso3xAC_!!6000000001295-2-tps-1104-1472.png"
# image_url_2 = "https://img.alicdn.com/imgextra/i4/O1CN01M9azZ41YdblclkU6Z_!!6000000003082-2-tps-1696-960.png"
# 【方式二】使用本地文件(支持绝对路径和相对路径)
# 格式要求:file:// + 文件路径
# 示例(绝对路径):
# image_url_1 = "file://" + "/path/to/your/image_1.png" # Linux/macOS
# image_url_2 = "file://" + "C:/path/to/your/image_2.png" # Windows
# 示例(相对路径):
# image_url_1 = "file://" + "./image_1.png" # 以实际路径为准
# image_url_2 = "file://" + "./image_1.png" # 以实际路径为准
# 【方式三】使用Base64编码的图片
image_url_1
=
encode_file
(
"./图片识别2.png"
)
# 以实际路径为准
# image_url_2 = encode_file("./image_2.png") # 以实际路径为准
print
(
'----sync call, please wait a moment----'
)
rsp
=
ImageSynthesis
.
call
(
api_key
=
api_key
,
model
=
"wan2.5-i2i-preview"
,
prompt
=
"将图片中的 AGN 跟 UCLINK LOGISITICS LTD 这两段文字抹去, 确保其他的内容都保留,包括水印等"
,
images
=
[
image_url_1
],
negative_prompt
=
""
,
n
=
1
,
watermark
=
False
,
seed
=
12345
)
print
(
'response:
%
s'
%
rsp
)
if
rsp
.
status_code
==
HTTPStatus
.
OK
:
# 在当前目录下保存图片
for
result
in
rsp
.
output
.
results
:
file_name
=
PurePosixPath
(
unquote
(
urlparse
(
result
.
url
)
.
path
))
.
parts
[
-
1
]
with
open
(
'./
%
s'
%
file_name
,
'wb+'
)
as
f
:
f
.
write
(
requests
.
get
(
result
.
url
)
.
content
)
else
:
print
(
'sync_call Failed, status_code:
%
s, code:
%
s, message:
%
s'
%
(
rsp
.
status_code
,
rsp
.
code
,
rsp
.
message
))
\ No newline at end of file
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论