提交 cd1abb3c authored 作者: 贺阳's avatar 贺阳

ai处理清晰的优化

上级 a98c002c
......@@ -1169,7 +1169,7 @@ class CcBL(models.Model):
def action_batch_get_pod_info(self):
"""批量获取尾程POD信息"""
return {
'name': _('Batch Get POD Info'),
'name': _('Preview POD Files'),#展示预览文件 #_('Batch Get POD Info'),
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'batch.get.pod.info.wizard',
......
......@@ -32,56 +32,97 @@ class AIImageEditService:
start_time = time.time()
_logger.info(f"开始AI图片编辑,目标文字: {text_to_remove}")
print(f"开始AI图片编辑,目标文字: {text_to_remove}") # 添加print输出
try:
# 根据要移除的文字构建不同的提示词,强调保持清晰度
if "AGN" in text_to_remove and "UCLINK" in text_to_remove:
prompt = "将图片中的AGN和UCLINK LOGISITICS LTD这一段文字完全抹去,保持背景完全一致,保持图片清晰度和分辨率,不要模糊"
prompt = "Remove the text 'AGN' and 'UCLINK LOGISITICS LTD' from the image completely, maintain perfect background consistency, preserve maximum image clarity and sharpness, high resolution, crisp details, professional quality, no blur, no pixelation, maintain original image quality"
elif "AGN" in text_to_remove:
prompt = "将图片中的AGN这一段文字完全抹去,保持背景完全一致,保持图片清晰度和分辨率,不要模糊"
prompt = "Remove the text 'AGN' from the image completely, maintain perfect background consistency, preserve maximum image clarity and sharpness, high resolution, crisp details, professional quality, no blur, no pixelation, maintain original image quality"
elif "UCLINK" in text_to_remove:
prompt = "将图片中的UCLINK LOGISITICS LTD这一段文字完全抹去,保持背景完全一致,保持图片清晰度和分辨率,不要模糊"
prompt = "Remove the text 'UCLINK LOGISITICS LTD' from the image completely, maintain perfect background consistency, preserve maximum image clarity and sharpness, high resolution, crisp details, professional quality, no blur, no pixelation, maintain original image quality"
else:
prompt = f"将图片中的{text_to_remove}这一段文字完全抹去,保持背景完全一致,保持图片清晰度和分辨率,不要模糊"
prompt = f"Remove the text '{text_to_remove}' from the image completely, maintain perfect background consistency, preserve maximum image clarity and sharpness, high resolution, crisp details, professional quality, no blur, no pixelation, maintain original image quality"
# 调用ImageSynthesis模型,使用更高质量的设置
api_start_time = time.time()
rsp = ImageSynthesis.call(
api_key=self.api_key, # 显式传递API密钥
model=self.model,
prompt=prompt,
images=[f"data:image/png;base64,{image_base64}"],
negative_prompt="模糊、不清晰、低质量、文字、文本、字母、数字、噪点、失真",
negative_prompt="blurry, unclear, low quality, text, letters, numbers, noise, distortion, pixelated, compressed, low resolution, fuzzy, grainy, soft, out of focus, low detail, poor quality, artifacts, compression artifacts, jpeg artifacts, lossy compression",
n=1,
watermark=False,
seed=12345,
# 添加质量相关参数
size="1024*1024", # 使用较高分辨率
quality="hd" # 使用高清质量
# 移除size参数,让AI使用原始图片尺寸
# size="2048*2048", # 这个参数格式不正确,导致API错误
quality="hd", # 使用高清质量
# 添加更多质量参数
style="photographic", # 使用摄影风格保持清晰度
enhance_prompt=True, # 增强提示词效果
)
api_end_time = time.time()
api_processing_time = api_end_time - api_start_time
_logger.info(f"AI API调用耗时: {api_processing_time:.2f}秒")
if rsp.status_code == HTTPStatus.OK:
# 获取处理后图片的URL
image_url = rsp.output.results[0].url
_logger.info(f"AI图片编辑成功,图片URL: {image_url}")
# 检查返回结果结构
_logger.info(f"API响应结构: {rsp}")
print(f"API响应结构: {rsp}")
# 下载图片并转换为base64
download_start_time = time.time()
edited_image_base64 = self.download_and_convert_to_base64(image_url)
download_end_time = time.time()
download_time = download_end_time - download_start_time
_logger.info(f"图片下载耗时: {download_time:.2f}秒")
# 检查任务状态
if hasattr(rsp, 'output') and hasattr(rsp.output, 'task_status'):
task_status = rsp.output.task_status
_logger.info(f"AI任务状态: {task_status}")
print(f"AI任务状态: {task_status}")
if task_status == "FAILED":
error_code = getattr(rsp.output, 'code', 'Unknown')
error_message = getattr(rsp.output, 'message', 'Unknown error')
_logger.error(f"AI任务失败 - 错误码: {error_code}, 错误信息: {error_message}")
print(f"AI任务失败 - 错误码: {error_code}, 错误信息: {error_message}")
return None
elif task_status != "SUCCEEDED":
_logger.warning(f"AI任务状态异常: {task_status}")
print(f"AI任务状态异常: {task_status}")
return None
total_time = time.time() - start_time
_logger.info(f"AI图片编辑总耗时: {total_time:.2f}秒")
return edited_image_base64
# 安全地获取处理后图片的URL
try:
if hasattr(rsp, 'output') and hasattr(rsp.output, 'results') and len(rsp.output.results) > 0:
image_url = rsp.output.results[0].url
_logger.info(f"AI图片编辑成功,图片URL: {image_url}")
print(f"AI图片编辑成功,图片URL: {image_url}") # 添加print输出
# 下载图片并转换为base64
download_start_time = time.time()
edited_image_base64 = self.download_and_convert_to_base64(image_url)
download_end_time = time.time()
download_time = download_end_time - download_start_time
_logger.info(f"图片下载耗时: {download_time:.2f}秒")
total_time = time.time() - start_time
_logger.info(f"AI图片编辑总耗时: {total_time:.2f}秒")
print(f"AI图片编辑总耗时: {total_time:.2f}秒") # 添加print输出
return edited_image_base64
else:
_logger.error(f"API返回结果结构异常: {rsp}")
print(f"API返回结果结构异常: {rsp}")
return None
except (IndexError, AttributeError) as e:
_logger.error(f"解析API返回结果失败: {str(e)}")
_logger.error(f"完整响应: {rsp}")
print(f"解析API返回结果失败: {str(e)}")
print(f"完整响应: {rsp}")
return None
else:
_logger.error(f"AI图片编辑失败,HTTP返回码:{rsp.status_code}")
_logger.error(f"错误码:{rsp.code}")
_logger.error(f"错误信息:{rsp.message}")
print(f"AI图片编辑失败,HTTP返回码:{rsp.status_code}") # 添加print输出
return None
except Exception as e:
......
......@@ -31,7 +31,7 @@ class BatchGetPodInfoWizard(models.TransientModel):
sync_last_mile_pod = fields.Boolean(
string='Sync Last Mile POD', # 同步尾程POD
default=True,
default=False,
help='Whether to sync last mile POD information' # 是否同步尾程POD信息
)
......@@ -43,7 +43,7 @@ class BatchGetPodInfoWizard(models.TransientModel):
sync_match_node = fields.Boolean(
string='Sync Push Match Node', # 同步推送匹配节点
default=True,
default=False,
help='Whether to sync and push matched node information' # 是否同步推送匹配节点信息
)
......@@ -61,18 +61,6 @@ class BatchGetPodInfoWizard(models.TransientModel):
pdf_filename = fields.Char(string='PDF文件名称')
processed_files_data = fields.Text(string='已处理的文件数据', help='存储已处理的文件信息(JSON格式)')
# def __del__(self):
# """
# 析构方法:确保在对象被销毁时清理临时文件
# """
# try:
# logging.info(f"__del__: {self._name}")
# if hasattr(self, '_name') and self._name == 'batch.get.pod.info.wizard':
# self._cleanup_all_temp_files()
# except Exception as e:
# # 析构方法中不能抛出异常
# logging.error(f"__del__: {str(e)}")
def action_preview(self):
"""
预览操作:获取PDF、处理涂抹、合并PDF并显示
......@@ -683,7 +671,7 @@ class BatchGetPodInfoWizard(models.TransientModel):
if text_still_exists:
# 第五步:如果仍然存在,记录错误信息并停止处理
error_msg = f"提单 {bl.bl_no} 经过OCR和AI处理后仍存在目标文字: {', '.join(final_found_texts)},请手动检查"
error_msg = f"提单 {bl.bl_no} 经过系统处理后仍存在目标文字: {', '.join(final_found_texts)},请取消该提单操作,手动处理"
_logger.error(error_msg)
error_messages.append(error_msg)
# 不更新文件数据,保持原始状态
......@@ -694,7 +682,7 @@ class BatchGetPodInfoWizard(models.TransientModel):
ocr_check_pdf = base64.b64decode(processed_file_data)
text_still_exists, ocr_found_texts = self._check_target_texts_exist(ocr_check_pdf, bl.bl_no)
if text_still_exists:
error_msg = f"提单 {bl.bl_no} OCR处理未完全清除文字,AI处理失败: {', '.join(ocr_found_texts)},请手动检查"
error_msg = f"提单 {bl.bl_no} 经过系统处理后仍存在目标文字: {', '.join(ocr_found_texts)},请取消该提单操作,手动处理"
error_messages.append(error_msg)
# 不更新文件数据,保持原始状态
processed_file_data = file_data
......@@ -706,7 +694,7 @@ class BatchGetPodInfoWizard(models.TransientModel):
final_check_pdf = base64.b64decode(processed_file_data)
text_still_exists, final_found_texts = self._check_target_texts_exist(final_check_pdf, bl.bl_no)
if text_still_exists:
error_msg = f"提单 {bl.bl_no} OCR处理未完全清除文字,AI处理失败: {', '.join(final_found_texts)},请手动检查"
error_msg = f"提单 {bl.bl_no} 经过系统处理后仍存在目标文字: {', '.join(final_found_texts)},请取消该提单操作,手动处理"
error_messages.append(error_msg)
# 不更新文件数据,保持原始状态
processed_file_data = file_data
......@@ -828,6 +816,29 @@ class BatchGetPodInfoWizard(models.TransientModel):
# 转换为base64
img_base64 = base64.b64encode(img_data).decode('utf-8')
# 保存AI转换前的图片用于对比
try:
import os
from datetime import datetime
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
before_ai_filename = f"before_ai_{bl_no}_page{page_num + 1}_{timestamp}.png"
# 创建保存目录
save_dir = os.path.join(os.getcwd(), "ai_comparison_images")
if not os.path.exists(save_dir):
os.makedirs(save_dir)
before_ai_path = os.path.join(save_dir, before_ai_filename)
# 保存原始图片
with open(before_ai_path, 'wb') as f:
f.write(img_data)
_logger.info(f"已保存AI转换前的图片: {before_ai_path}")
print(f"已保存AI转换前的图片: {before_ai_path}") # 添加print输出
except Exception as e:
_logger.warning(f"保存AI转换前图片失败: {str(e)}")
print(f"保存AI转换前图片失败: {str(e)}") # 添加print输出
# 使用AI编辑图片,移除指定文字
ai_start_time = time.time()
edited_img_base64 = ai_service.edit_image_remove_text(
......@@ -841,6 +852,20 @@ class BatchGetPodInfoWizard(models.TransientModel):
# 解码base64图片数据
edited_img_data = base64.b64decode(edited_img_base64)
# 保存AI转换后的图片用于对比
try:
after_ai_filename = f"after_ai_{bl_no}_page{page_num + 1}_{timestamp}.png"
after_ai_path = os.path.join(save_dir, after_ai_filename)
# 保存AI处理后的图片
with open(after_ai_path, 'wb') as f:
f.write(edited_img_data)
_logger.info(f"已保存AI转换后的图片: {after_ai_path}")
print(f"已保存AI转换后的图片: {after_ai_path}") # 添加print输出
except Exception as e:
_logger.warning(f"保存AI转换后图片失败: {str(e)}")
print(f"保存AI转换后图片失败: {str(e)}") # 添加print输出
# 保存处理后的图片数据
processed_pages.append({
'img_data': edited_img_data,
......
import base64
import mimetypes
from http import HTTPStatus
from urllib.parse import urlparse, unquote
from pathlib import PurePosixPath
import dashscope
import requests
from dashscope import ImageSynthesis
import os
# 以下为北京地域url,若使用新加坡地域的模型,需将url替换为:https://dashscope-intl.aliyuncs.com/api/v1
dashscope.base_http_api_url = 'https://dashscope.aliyuncs.com/api/v1'
# 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx"
# 新加坡和北京地域的API Key不同。获取API Key:https://help.aliyun.com/zh/model-studio/get-api-key
api_key = 'sk-e41914f0d9c94035a5ae1322e9a61fb1'
# --- 输入图片:使用 Base64 编码 ---
# base64编码格式为 data:{MIME_type};base64,{base64_data}
def encode_file(file_path):
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type or not mime_type.startswith("image/"):
raise ValueError("不支持或无法识别的图像格式")
with open(file_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
return f"data:{mime_type};base64,{encoded_string}"
"""
图像输入方式说明:
以下提供了三种图片输入方式,三选一即可
1. 使用公网URL - 适合已有公开可访问的图片
2. 使用本地文件 - 适合本地开发测试
3. 使用Base64编码 - 适合私有图片或需要加密传输的场景
"""
# 【方式一】使用公网图片 URL
# image_url_1 = "https://img.alicdn.com/imgextra/i4/O1CN01TlDlJe1LR9zso3xAC_!!6000000001295-2-tps-1104-1472.png"
# image_url_2 = "https://img.alicdn.com/imgextra/i4/O1CN01M9azZ41YdblclkU6Z_!!6000000003082-2-tps-1696-960.png"
# 【方式二】使用本地文件(支持绝对路径和相对路径)
# 格式要求:file:// + 文件路径
# 示例(绝对路径):
# image_url_1 = "file://" + "/path/to/your/image_1.png" # Linux/macOS
# image_url_2 = "file://" + "C:/path/to/your/image_2.png" # Windows
# 示例(相对路径):
# image_url_1 = "file://" + "./image_1.png" # 以实际路径为准
# image_url_2 = "file://" + "./image_1.png" # 以实际路径为准
# 【方式三】使用Base64编码的图片
image_url_1 = encode_file("./图片识别2.png") # 以实际路径为准
# image_url_2 = encode_file("./image_2.png") # 以实际路径为准
print('----sync call, please wait a moment----')
rsp = ImageSynthesis.call(api_key=api_key,
model="wan2.5-i2i-preview",
prompt="将图片中的 AGN 跟 UCLINK LOGISITICS LTD 这两段文字抹去, 确保其他的内容都保留,包括水印等",
images=[image_url_1],
negative_prompt="",
n=1,
watermark=False,
seed=12345)
print('response: %s' % rsp)
if rsp.status_code == HTTPStatus.OK:
# 在当前目录下保存图片
for result in rsp.output.results:
file_name = PurePosixPath(unquote(urlparse(result.url).path)).parts[-1]
with open('./%s' % file_name, 'wb+') as f:
f.write(requests.get(result.url).content)
else:
print('sync_call Failed, status_code: %s, code: %s, message: %s' %
(rsp.status_code, rsp.code, rsp.message))
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论