提交 85aad0cf authored 作者: 伍姿英's avatar 伍姿英

Merge branch 'release/3.11.0'

...@@ -47,6 +47,7 @@ ...@@ -47,6 +47,7 @@
'views/cc_history_package_sync_log_view.xml', 'views/cc_history_package_sync_log_view.xml',
'views/history_tt_api_log.xml', 'views/history_tt_api_log.xml',
'views/res_partner_view.xml', 'views/res_partner_view.xml',
'views/config_setting.xml',
'views/menu_view.xml', 'views/menu_view.xml',
# 'views/cc_customers_declaration_order_view.xml', # 'views/cc_customers_declaration_order_view.xml',
'templates/login.xml', 'templates/login.xml',
......
...@@ -8,6 +8,11 @@ import re ...@@ -8,6 +8,11 @@ import re
import tempfile import tempfile
from datetime import datetime, timedelta from datetime import datetime, timedelta
from io import BytesIO from io import BytesIO
import csv # 确保导入csv处理工具
import os
# 引入你的本地脚本函
from ..pdf_tools.pod_indexer import index_pod_directory
from ..pdf_tools.awb_page_merger import merge_awb_pages
import pdfplumber import pdfplumber
import xlrd import xlrd
...@@ -120,6 +125,9 @@ class OrderStateChangeRule(models.Model): ...@@ -120,6 +125,9 @@ class OrderStateChangeRule(models.Model):
def upload_pod_attachment(self, bl_obj, name, data,file_name='货站提货POD'): def upload_pod_attachment(self, bl_obj, name, data,file_name='货站提货POD'):
"""尾程交接POD(待大包数量和箱号)/货站提货POD 文件上传与同步""" """尾程交接POD(待大包数量和箱号)/货站提货POD 文件上传与同步"""
none_clearance_file_objs = self.env['cc.clearance.file'].sudo().search([('file_name', '=', file_name),
('bl_id', 'in', bl_obj.ids), ('file', '=', False)])
none_clearance_file_objs.unlink()
arr = [ arr = [
{ {
'file_name': file_name, 'file_name': file_name,
...@@ -143,20 +151,20 @@ class OrderStateChangeRule(models.Model): ...@@ -143,20 +151,20 @@ class OrderStateChangeRule(models.Model):
retries += 1 # 上传失败,增加重试次数 retries += 1 # 上传失败,增加重试次数
if retries > max_retries: if retries > max_retries:
_logger.info(f"上传文件 {file_obj.attachment_name} 失败,已尝试 {max_retries} 次,仍然失败。") _logger.info(f"上传文件 {file_obj.attachment_name} 失败,已尝试 {max_retries} 次,仍然失败。")
break # 超过最大重试次数后跳出循环 return False # 超过最大重试次数后跳出循环
else: else:
_logger.info(f"上传文件 {file_obj.attachment_name} 失败,正在重新尝试第 {retries} 次...") _logger.info(f"上传文件 {file_obj.attachment_name} 失败,正在重新尝试第 {retries} 次...")
else: else:
# 如果上传成功,退出重试循环 # 如果上传成功,退出重试循环
_logger.info(f"文件 {file_obj.attachment_name} 上传成功") _logger.info(f"文件 {file_obj.attachment_name} 上传成功")
break return True
except Exception as e: except Exception as e:
# 捕获任何异常并重试 # 捕获任何异常并重试
retries += 1 retries += 1
_logger.info(f"发生异常:{e},正在重新尝试第 {retries} 次上传文件 {file_obj.attachment_name}...") _logger.info(f"发生异常:{e},正在重新尝试第 {retries} 次上传文件 {file_obj.attachment_name}...")
if retries > max_retries: if retries > max_retries:
_logger.info(f"上传文件 {file_obj.attachment_name} 失败,已尝试 {max_retries} 次,仍然失败。") _logger.info(f"上传文件 {file_obj.attachment_name} 失败,已尝试 {max_retries} 次,仍然失败。")
break # 超过最大重试次数后跳出循环 return False # 超过最大重试次数后跳出循环
def get_pdf_order_data(self, attachment_data): def get_pdf_order_data(self, attachment_data):
"""识别PDF数据方法""" """识别PDF数据方法"""
...@@ -238,66 +246,276 @@ class OrderStateChangeRule(models.Model): ...@@ -238,66 +246,276 @@ class OrderStateChangeRule(models.Model):
pattern = re.compile("\\d{3}-\\d{8}\s*") pattern = re.compile("\\d{3}-\\d{8}\s*")
data_re = re.compile(pattern) data_re = re.compile(pattern)
data_arr = data_re.findall(email_body) data_arr = data_re.findall(email_body)
data_arr = [i.replace('\r\n', '') for i in data_arr] data_arr = [i.replace('\r\n', '').replace('\xa0', '') for i in data_arr]
return data_arr return data_arr
def fetch_final_mail_dlv(self, **kwargs): def fetch_final_mail_dlv(self, **kwargs):
"""尾程交接邮件提取""" """尾程交接邮件提取 - 增强版(支持仅凭附件识别同步)"""
email_body = kwargs['email_body'] email_body = kwargs['email_body']
email_body = html.unescape(email_body) email_body = html.unescape(email_body)
# 1. 提取邮件正文中的单号
text_arr = self.find_final_email_text(email_body) text_arr = self.find_final_email_text(email_body)
logging.info('data_arr: %s' % text_arr) logging.info('邮件正文提取单号: %s' % text_arr)
attachment_arr = kwargs['attachment_arr'] attachment_arr = kwargs['attachment_arr']
# attachment_tuple = attachment_arr[0] if attachment_arr else []
attachment_tuple_arr = attachment_arr if attachment_arr else [] attachment_tuple_arr = attachment_arr if attachment_arr else []
# order_obj_arr = []
try: try:
text_arr = [i.replace('-', '').replace(' ', '') for i in text_arr] from ..pdf_tools.baidu_ocr_config import get_baidu_ocr_config
ids = [] from ..pdf_tools import awb_page_merger
if text_arr:
sql = "select id from cc_bl where UPPER(REPLACE(REPLACE(REPLACE(bl_no, ' ', ''), '-', ''), '/', '')) in %s" ICP = self.env['ir.config_parameter'].sudo()
self._cr.execute(sql, (tuple(text_arr),)) # 1. 组装 Odoo 系统参数
result = self._cr.fetchall() odoo_ocr_config = {
ids = [i[0] for i in result] "baidu_ocr_app_id": ICP.get_param('ocr.baidu_app_id', ''),
bl_objs = self.env['cc.bl'].sudo().search([('id', 'in', ids)]) if ids else False "baidu_ocr_api_key": ICP.get_param('ocr.baidu_api_key', ''),
if bl_objs and attachment_tuple_arr: "baidu_ocr_secret_key": ICP.get_param('ocr.baidu_secret_key', ''),
file_objs = self.env['cc.clearance.file'].sudo().search( # Odoo的系统参数存的都是字符串,所以需要做类型转换
[('file_name', '=', '尾程交接POD(待大包数量和箱号)'), "ocr_enabled": ICP.get_param('ocr.enabled') == 'True',
('bl_id', 'in', bl_objs.ids)]) "ocr_timeout": int(ICP.get_param('ocr.timeout', 30)),
file_objs.unlink() "max_retries": int(ICP.get_param('ocr.max_retries', 3)),
for attachment_tuple in attachment_tuple_arr: }
attachment_name, attachment_data = attachment_tuple print(odoo_ocr_config)
self.upload_pod_attachment(bl_objs, attachment_name, attachment_data) # 2. 注入配置:传 save_to_file=False 保证只在当前运行内存生效,不修改底层 json
# redis_conn = self.env['common.common'].sudo().get_redis() ocr_config_manager = get_baidu_ocr_config()
# if redis_conn == 'no': ocr_config_manager.update_config(odoo_ocr_config, save_to_file=False)
# raise ValidationError('未连接redis') # 3. 核心!清理 awb_page_merger 中的全局懒加载缓存
# else: # 因为 awb_page_merger 缓存了 _OCR_EXTRACTOR 实例
# redis_conn.lpush('mail_push_package_list', json.dumps({'id': bl_obj.id, 'utc_time': utc_time.strftime("%Y-%m-%d %H:%M:%S")})) # 必须重置为 None,否则如果在 Odoo 界面修改了秘钥,系统还会继续使用旧秘钥
if not bl_objs: awb_page_merger._OCR_EXTRACTOR = None
mail_time = (datetime.utcnow() + timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S") logging.info("Odoo OCR 配置注入成功")
content = f"""<p>您好: except ImportError as e:
邮箱在{mail_time}(+8)时间接收到主题为POD的邮件,但未识别到对应的提单,请检查 logging.error(f"导入 OCR 模块失败,请检查路径: {e}")
避免推送超时!</p>""" except Exception as e:
# 给客户配置的每个邮箱都发送邮件 logging.error(f"注入 OCR 配置时发生错误: {e}")
patrol_sender_email = self.env["ir.config_parameter"].sudo().get_param('patrol_sender_email') or '' try:
patrol_receiver_emails = self.env["ir.config_parameter"].sudo().get_param( # 清洗邮件正文提取的单号
'patrol_receiver_emails') or '' text_arr = [i.replace('-', '').replace(' ', '').replace('\xa0', '') for i in text_arr]
mail = self.env['mail.mail'].sudo().create({ # 2. 如果有附件,无论邮件正文有没有单号,都要进行 PDF 拆分识别
"email_from": patrol_sender_email, if attachment_tuple_arr:
'subject': 'POD邮件未提取到提单', with tempfile.TemporaryDirectory() as temp_dir:
'body_html': content, pod_dir = os.path.join(temp_dir, "POD")
'email_to': patrol_receiver_emails pages_dir = os.path.join(pod_dir, "pages")
# 'email_to': "1663490807@qq.com,820656583@qq.com" output_dir = os.path.join(temp_dir, "Output")
}) os.makedirs(pod_dir)
mail.send() os.makedirs(output_dir)
for i in range(2): for file_name, pdf_data in attachment_tuple_arr:
if mail.failure_reason: pdf_path = os.path.join(pod_dir, file_name)
logging.info('邮件发送失败原因:%s' % mail.failure_reason) with open(pdf_path, 'wb') as f:
mail.write({'state': 'outgoing'}) f.write(pdf_data)
ctx_index = {
"dir_path": pod_dir,
"output_index_csv": os.path.join(pod_dir, "pod_index.csv"),
"output_summary_csv": os.path.join(pod_dir, "summary.csv"),
"save_pages": True,
"page_output_dir": pages_dir,
"pipeline_split_first": True
}
# 这一步执行 OCR 识别和拆分
index_pod_directory(ctx_index)
# --- 【新增逻辑:从识别结果 CSV 中提取单号】 ---
pdf_detected_awbs = []
index_csv_path = ctx_index["output_index_csv"]
if os.path.exists(index_csv_path):
with open(index_csv_path, 'r', encoding='utf-8') as f:
# 自动处理标题行前后的空格
reader = csv.DictReader(f)
for row in reader:
# 获取 'awbs' 列的内容
awb_raw_str = row.get('awbs', '')
if awb_raw_str:
# 按逗号分割字符串,并对每个单号进行清洗
split_awbs = [a.strip().replace('-', '').replace(' ', '')
for a in awb_raw_str.split(',') if a.strip()]
pdf_detected_awbs.extend(split_awbs)
# 合并邮件正文单号和 PDF 识别单号,去重
combined_text_arr = list(set(text_arr + pdf_detected_awbs))
logging.info('合并后的待查询单号池: %s' % combined_text_arr)
# ----------------------------------------------
# 3. 根据合并后的单号去数据库查提单对象
ids = []
if combined_text_arr:
sql = "select id from cc_bl where UPPER(REPLACE(REPLACE(REPLACE(bl_no, ' ', ''), '-', ''), '/', '')) in %s"
self._cr.execute(sql, (tuple(combined_text_arr),))
result = self._cr.fetchall()
ids = [i[0] for i in result]
bl_objs = self.env['cc.bl'].sudo().search([('id', 'in', ids)]) if ids else False
not_bl_pdf_arr = []
upload_fail_arr = []
if bl_objs:
for bl_obj in bl_objs:
target_awb = bl_obj.bl_no
if not target_awb:
continue
ctx_merge = {
"awb": target_awb,
"index_file": ctx_index["output_index_csv"],
"pages_dir": pages_dir,
"output_dir": output_dir
}
result_merge = merge_awb_pages(ctx_merge)
if result_merge.get("output") and os.path.exists(result_merge["output"]):
with open(result_merge["output"], 'rb') as f:
extracted_pdf_bytes = f.read()
is_upload_ok = self.upload_pod_attachment(bl_obj, f'{bl_obj.bl_no}.pdf', extracted_pdf_bytes)
if not is_upload_ok:
upload_fail_arr.append(bl_obj.bl_no)
else:
not_bl_pdf_arr.append(bl_obj.bl_no)
self._cr.commit()
# 4. 异常报警逻辑
if not bl_objs or not_bl_pdf_arr or upload_fail_arr:
mail_time = (datetime.utcnow() + timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")
content = f"""<p>您好:
邮箱在{mail_time}(+8)时间接收到主题为POD的邮件,但未识别到对应的提单,请检查
避免推送超时!</p>
"""
if not_bl_pdf_arr:
content += f"\n 以下提单未提取到PDF文件: {'/'.join(not_bl_pdf_arr)}"
if upload_fail_arr:
content += f"<br> 以下提单向TK推送POD文件失败: {'/'.join(upload_fail_arr)}"
# 给客户配置的每个邮箱都发送邮件
patrol_sender_email = self.env["ir.config_parameter"].sudo().get_param(
'patrol_sender_email') or ''
patrol_receiver_emails = self.env["ir.config_parameter"].sudo().get_param(
'patrol_receiver_emails') or ''
mail = self.env['mail.mail'].sudo().create({
"email_from": patrol_sender_email,
'subject': 'POD邮件未提取到提单',
'body_html': content,
'email_to': patrol_receiver_emails
# 'email_to': "1663490807@qq.com,820656583@qq.com"
})
mail.send() mail.send()
for i in range(2):
if mail.failure_reason:
logging.info('邮件发送失败原因:%s' % mail.failure_reason)
mail.write({'state': 'outgoing'})
mail.send()
except Exception as err: except Exception as err:
logging.error('fetch_final_mail_dlv--error:%s' % str(err)) logging.error('fetch_final_mail_dlv--error:%s' % str(err))
# def fetch_final_mail_dlv(self, **kwargs):
# """尾程交接邮件提取"""
# email_body = kwargs['email_body']
# email_body = html.unescape(email_body)
# text_arr = self.find_final_email_text(email_body)
# logging.info('data_arr: %s' % text_arr)
# attachment_arr = kwargs['attachment_arr']
# # attachment_tuple = attachment_arr[0] if attachment_arr else []
# attachment_tuple_arr = attachment_arr if attachment_arr else []
# # order_obj_arr = []
# try:
# text_arr = [i.replace('-', '').replace(' ', '').replace('\xa0', '') for i in text_arr]
# ids = []
# if text_arr:
# sql = "select id from cc_bl where UPPER(REPLACE(REPLACE(REPLACE(bl_no, ' ', ''), '-', ''), '/', '')) in %s"
# self._cr.execute(sql, (tuple(text_arr),))
# result = self._cr.fetchall()
# ids = [i[0] for i in result]
# bl_objs = self.env['cc.bl'].sudo().search([('id', 'in', ids)]) if ids else False
# not_bl_pdf_arr = []
# if bl_objs:
# # 提单对象 bl_no提单号
# # attachment_tuple_arr [('11.pdf', 'pdf数据')]
# # 1. 开启临时文件夹 (with 块结束时,所有临时文件会自动销毁)
# with tempfile.TemporaryDirectory() as temp_dir:
# # 构建临时目录结构
# pod_dir = os.path.join(temp_dir, "POD")
# pages_dir = os.path.join(pod_dir, "pages")
# output_dir = os.path.join(temp_dir, "Output")
# os.makedirs(pod_dir)
# os.makedirs(output_dir)
# # 2. 将内存中的 PDF 数据写入临时目录
# for file_name, pdf_data in attachment_tuple_arr:
# pdf_path = os.path.join(pod_dir, file_name)
# with open(pdf_path, 'wb') as f:
# # 注意:Odoo 里的附件通常是 base64 编码的。
# # 如果你的 'pdf数据' 是 base64 字符串/bytes,请用 base64.b64decode(pdf_data)
# # 如果已经是纯二进制流(rb读取的),直接写入即可:f.write(pdf_data)
# # f.write(base64.b64decode(pdf_data))
# f.write(pdf_data)
# # 3. 对这些 PDF 进行集中拆分和识别(只执行一次,非常关键)
# ctx_index = {
# "dir_path": pod_dir,
# "output_index_csv": os.path.join(pod_dir, "pod_index.csv"),
# "output_summary_csv": os.path.join(pod_dir, "summary.csv"),
# "save_pages": True,
# "page_output_dir": pages_dir,
# "pipeline_split_first": True
# }
# # 这一步会消耗一点时间,它会生成单页 PDF 和索引 CSV
# index_pod_directory(ctx_index)
# # 4. 遍历你的提单对象,按需提取 PDF
# for bl_obj in bl_objs:
# target_awb = bl_obj.bl_no # 获取提单号,例如 '436-10353136'
# if not target_awb:
# continue
# # 调用拼合工具
# ctx_merge = {
# "awb": target_awb,
# "index_file": ctx_index["output_index_csv"],
# "pages_dir": pages_dir,
# "output_dir": output_dir
# }
# result = merge_awb_pages(ctx_merge)
# # 5. 检查是否成功生成了对应的单号 PDF
# if result.get("output") and os.path.exists(result["output"]):
# # 将生成的 PDF 重新读回内存
# with open(result["output"], 'rb') as f:
# extracted_pdf_bytes = f.read()
# # 重新转为 base64,准备存入 Odoo
# # extracted_pdf_b64 = base64.b64encode(extracted_pdf_bytes)
# # print(extracted_pdf_bytes)
# self.upload_pod_attachment(bl_obj, f'{bl_obj.bl_no}.pdf', extracted_pdf_bytes)
# # bl_pdf_arr.append((bl_obj, f'{bl_obj.bl_no}.pdf', extracted_pdf_bytes))
#
# else:
# # 没找到这个单号对应的页面
# not_bl_pdf_arr.append(bl_obj.bl_no)
# # 这里可以记个日志,或者给 bl_obj 打个“未找到凭证”的标签
# self._cr.commit()
# # 屏蔽 2026-03-26以下
# # if bl_objs and attachment_tuple_arr:
# # file_objs = self.env['cc.clearance.file'].sudo().search(
# # [('file_name', '=', '尾程交接POD(待大包数量和箱号)'),
# # ('bl_id', 'in', bl_objs.ids)])
# # file_objs.unlink()
# # for attachment_tuple in attachment_tuple_arr:
# # attachment_name, attachment_data = attachment_tuple
# # self.upload_pod_attachment(bl_objs, attachment_name, attachment_data)
# # 屏蔽 2026-03-26 以上
# # redis_conn = self.env['common.common'].sudo().get_redis()
# # if redis_conn == 'no':
# # raise ValidationError('未连接redis')
# # else:
# # redis_conn.lpush('mail_push_package_list', json.dumps({'id': bl_obj.id, 'utc_time': utc_time.strftime("%Y-%m-%d %H:%M:%S")}))
# if not bl_objs or not_bl_pdf_arr:
# mail_time = (datetime.utcnow() + timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")
# content = f"""<p>您好:
# 邮箱在{mail_time}(+8)时间接收到主题为POD的邮件,但未识别到对应的提单,请检查
# 避免推送超时!</p>
# """
# if not_bl_pdf_arr:
# content += f"\n 以下提单未提取到PDF文件 {'/'.join(not_bl_pdf_arr)}"
# # 给客户配置的每个邮箱都发送邮件
# patrol_sender_email = self.env["ir.config_parameter"].sudo().get_param('patrol_sender_email') or ''
# patrol_receiver_emails = self.env["ir.config_parameter"].sudo().get_param(
# 'patrol_receiver_emails') or ''
# mail = self.env['mail.mail'].sudo().create({
# "email_from": patrol_sender_email,
# 'subject': 'POD邮件未提取到提单',
# 'body_html': content,
# 'email_to': patrol_receiver_emails
# # 'email_to': "1663490807@qq.com,820656583@qq.com"
# })
# mail.send()
# for i in range(2):
# if mail.failure_reason:
# logging.info('邮件发送失败原因:%s' % mail.failure_reason)
# mail.write({'state': 'outgoing'})
# mail.send()
# except Exception as err:
# logging.error('fetch_final_mail_dlv--error:%s' % str(err))
def fetch_mail_dlv(self, **kwargs): def fetch_mail_dlv(self, **kwargs):
email_body = kwargs['email_body'] email_body = kwargs['email_body']
year = kwargs['year'] year = kwargs['year']
......
...@@ -17,6 +17,15 @@ class ResConfigSettings(models.TransientModel): ...@@ -17,6 +17,15 @@ class ResConfigSettings(models.TransientModel):
is_package_scan = fields.Boolean( is_package_scan = fields.Boolean(
'一键全扫开关', default=False, config_parameter='is_package_scan') '一键全扫开关', default=False, config_parameter='is_package_scan')
# OCR 相关配置
baidu_ocr_app_id = fields.Char(string="Baidu OCR App ID", config_parameter='ocr.baidu_app_id')
baidu_ocr_api_key = fields.Char(string="Baidu OCR API Key", config_parameter='ocr.baidu_api_key')
baidu_ocr_secret_key = fields.Char(string="Baidu OCR Secret Key", config_parameter='ocr.baidu_secret_key')
ocr_enabled = fields.Boolean(string="是否启用 OCR", config_parameter='ocr.enabled', default=True)
ocr_timeout = fields.Integer(string="OCR 超时时间(秒)", config_parameter='ocr.timeout', default=30)
ocr_max_retries = fields.Integer(string="最大重试次数", config_parameter='ocr.max_retries', default=3)
@api.model @api.model
def get_values(self): def get_values(self):
""" """
......
"""
AWB页面拼合工具
功能:
- 输入一个 AWB 编号,读取 `POD/pod_index.csv` 中包含该 AWB 的记录,定位对应的单页 PDF(`POD/pages/{index_id}.pdf`),并按页顺序拼合为一个 PDF,输出到 `POD/{AWB}.pdf`。
使用:
- CLI:`python POD/awb_page_merger.py --awb 436-XXXXXXX`
- 可选参数:`--index` 指定索引文件路径(默认 `POD/pod_index.csv`),`--pages-dir` 指定单页目录(默认 `POD/pages`),`--output-dir` 输出目录(默认 `POD`)。
说明:
- 按用户约定优先使用 context7 传参;CLI参数将转换为 context7。
- 仅处理与本工具相关的文件,不改动其他模块。
"""
from __future__ import annotations
import os
import csv
import sys
import argparse
from typing import List, Dict, Any, Optional, Tuple
import logging
from datetime import datetime
try:
import fitz # PyMuPDF
except ImportError as e:
raise RuntimeError("需要安装 PyMuPDF (fitz)。请在环境中安装:pip install PyMuPDF")
# 导入百度OCR提取器
try:
from .baidu_ocr_extractor import BaiduOCRExtractor
BAIDU_OCR_AVAILABLE = True
except ImportError as e:
print(f"警告: 无法导入百度OCR提取器: {e}")
BAIDU_OCR_AVAILABLE = False
# 统一百度OCR可用性:懒加载提取器并以 is_available() 为准
_OCR_EXTRACTOR = None
def _get_baidu_ocr_extractor():
"""获取并缓存百度OCR提取器实例
函数说明:
- 懒加载 `BaiduOCRExtractor`,避免模块级导入导致环境不一致;
- 返回已初始化的提取器或 None(不可用时)。
"""
global _OCR_EXTRACTOR
if _OCR_EXTRACTOR is not None:
return _OCR_EXTRACTOR
try:
from .baidu_ocr_extractor import BaiduOCRExtractor # 延迟导入
extractor = BaiduOCRExtractor()
_OCR_EXTRACTOR = extractor
logger.info("成功初始化百度OCR提取器")
return _OCR_EXTRACTOR
except Exception as e:
logger.warning(f"百度OCR初始化失败: {e}")
_OCR_EXTRACTOR = None
return None
# 模块级日志器(与 release_notes 的分拆风格保持一致)
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def _build_context7_from_args(args: argparse.Namespace) -> Dict[str, Any]:
"""从命令行参数构建 context7 字典
函数说明:
- 将 CLI 参数统一转换为 context7 格式,便于在代码内部传参。
返回:包含 awb、index_file、pages_dir、output_dir 的字典。
"""
return {
"awb": args.awb,
"index_file": args.index,
"pages_dir": args.pages_dir,
"output_dir": args.output_dir,
}
def load_index_records(index_path: str) -> List[Dict[str, Any]]:
"""读取索引CSV为记录列表
函数说明:
- 解析 `pod_index.csv` 的表头与行,返回字典列表。
- 预期表头:`file,page,index_id,awbs`。
返回:每行一个字典,字段包括 `file`、`page`、`index_id`、`awbs`(原始字符串)。
"""
if not os.path.isfile(index_path):
logger.warning(f"索引文件不存在: {index_path}")
raise FileNotFoundError(f"索引文件不存在: {index_path}")
rows: List[Dict[str, Any]] = []
with open(index_path, "r", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
rows.append({
"file": row.get("file", ""),
"page": int(row.get("page", "0") or 0),
"index_id": row.get("index_id", ""),
"awbs": row.get("awbs", ""),
})
logger.info(f"索引加载完成: {index_path} | 记录数={len(rows)}")
return rows
def _parse_awbs_field(awbs_field: str) -> List[str]:
"""解析CSV中的AWB字段为列表
函数说明:
- 输入形如:`"436-123456, 436-987654"` 的字符串,拆分为列表并去除空格。
返回:AWB字符串列表(已strip)。
"""
if not awbs_field:
return []
return [x.strip() for x in awbs_field.split(",") if x.strip()]
def find_page_paths_for_awb(index_records: List[Dict[str, Any]], pages_dir: str, target_awb: str) -> List[str]:
"""在索引记录中查找包含指定AWB的单页PDF路径,并按到达日期去重
函数说明:
- 遍历索引记录,匹配 `awbs` 列中是否包含 `target_awb`(精确匹配)。
- 根据 `index_id` 拼接出单页PDF路径:`pages_dir/{index_id}.pdf`。
- 打开单页PDF读取文本,提取 "Date Arrived"(或页面中首个 dd/mm/yyyy)并规范化为 `YYYY-MM-DD`。
- 按 "AWB + 到达日期" 维度进行去重:同一 AWB 在同一到达日期只保留一页;日期缺失时最多保留一页。
- 路径存在性检查,最终按 `file,page` 排序后返回。
返回:匹配到的单页PDF绝对路径列表(已去重)。
"""
logger.info(f"开始查找匹配页: AWB={target_awb} | pages_dir={pages_dir}")
hits: List[Dict[str, Any]] = []
for r in index_records:
items = _parse_awbs_field(r.get("awbs", ""))
if target_awb in items:
hits.append(r)
# 按原始文件名与页码排序,确保拼合顺序稳定
hits.sort(key=lambda r: (r.get("file", ""), r.get("page", 0)))
# 先构建候选路径
candidates: List[Tuple[str, Dict[str, Any]]] = []
for r in hits:
index_id = r.get("index_id")
if not index_id:
continue
p = os.path.join(pages_dir, f"{index_id}.pdf")
if os.path.isfile(p):
candidates.append((p, r))
else:
logger.debug(f"缺失单页PDF: {p}")
# 按文件与页码排序,保证稳定性
candidates.sort(key=lambda x: (x[1].get("file", ""), x[1].get("page", 0)))
# 去重:AWB+到达日期
seen_dates: set = set() # 存储规范化日期,如 "2025-11-26"
kept_unknown_date: bool = False
deduped_paths: List[str] = []
logger.info(f"候选页数={len(candidates)},开始按到达日期去重")
for p, r in candidates:
# 提取页面的到达日期
arrived = extract_date_arrived_from_pdf(p)
if arrived:
key = arrived
if key in seen_dates:
continue
seen_dates.add(key)
deduped_paths.append(p)
logger.debug(f"保留页(含日期): {os.path.basename(p)} | Date Arrived={arrived}")
else:
# 日期缺失:仅保留第一份未知日期的页面,避免重复
if kept_unknown_date:
continue
kept_unknown_date = True
deduped_paths.append(p)
logger.debug(f"保留页(日期缺失): {os.path.basename(p)}")
logger.info(f"去重后匹配页数={len(deduped_paths)}")
return deduped_paths
def merge_pages_to_pdf(page_paths: List[str], output_path: str) -> int:
"""将多个单页PDF合并为一个PDF文件
函数说明:
- 依次读取 `page_paths`,插入到新建文档中。
- 若某页读取失败则跳过,不影响其他页面。
返回:成功插入的页数。
"""
os.makedirs(os.path.dirname(output_path), exist_ok=True)
out = fitz.open()
count = 0
for p in page_paths:
try:
src = fitz.open(p)
out.insert_pdf(src)
count += src.page_count
src.close()
except Exception:
# 跳过无法读取的页面
continue
# 保存时启用压缩清理参数,减小体积但不降低精度
try:
out.save(output_path, garbage=3, deflate=True, clean=True)
except TypeError:
out.save(output_path, garbage=3, deflate=True)
out.close()
return count
def merge_awb_pages(context7: Dict[str, Any]) -> Dict[str, Any]:
"""主流程:按AWB查找并合并单页PDF
函数说明:
- 读取索引文件,定位包含目标AWB的记录和对应单页PDF路径。
- 页面级去重:按页面中识别到的到达日期(Date Arrived)进行去重,确保同一 AWB+日期只合并一次。
- 合并到输出目录,文件名即AWB(如:`POD/436-XXXXXXX.pdf`)。
返回:包含输入AWB、输出文件路径、合并页数与匹配页路径的结果字典。
"""
awb = context7.get("awb")
if not awb:
raise ValueError("缺少参数 awb")
# 路径默认值
# 优先推断当前脚本所在的POD目录
base_dir = os.path.dirname(__file__)
index_file = context7.get("index_file") or os.path.join(base_dir, "pod_index.csv")
pages_dir = context7.get("pages_dir") or os.path.join(base_dir, "pages")
# 默认输出到项目根的 release_notes_pdf,便于复用 release-notes 查询接口
project_root = os.path.dirname(os.path.dirname(base_dir))
default_release_notes_dir = os.path.join(project_root, "release_notes_pdf")
output_dir = context7.get("output_dir") or default_release_notes_dir
# 加载索引与匹配页
logger.info(f"开始合并流程: AWB={awb}")
records = load_index_records(index_file)
page_paths = find_page_paths_for_awb(records, pages_dir, awb)
if not page_paths:
logger.info(f"未找到包含AWB的页面: {awb}")
return {"awb": awb, "output": None, "merged_pages": 0, "page_paths": []}
# 合并输出
output_path = os.path.join(output_dir, f"{awb}.pdf")
logger.info(f"输出文件: {output_path} | 待合并页数={len(page_paths)}")
merged = merge_pages_to_pdf(page_paths, output_path)
logger.info(f"合并完成: AWB={awb} | 合并页数={merged}")
return {"awb": awb, "output": output_path, "merged_pages": merged, "page_paths": page_paths}
def _normalize_date(date_str: str) -> Optional[str]:
"""规范化日期字符串为 `YYYY-MM-DD`
函数说明:
- 支持常见的 `dd/mm/yyyy`、`dd-mm-yyyy`、`yyyy/mm/dd`、`yyyy-mm-dd`。
- 自动补零并保证位数正确;无法解析返回 None。
"""
if not date_str:
return None
import re
s = date_str.strip()
# 先统一分隔符
s = s.replace(".", "/").replace("-", "/")
parts = s.split("/")
if len(parts) != 3:
return None
a, b, c = parts
# 判断是 yyyy/mm/dd 还是 dd/mm/yyyy
try:
if len(a) == 4: # yyyy/mm/dd
yyyy = int(a)
mm = int(b)
dd = int(c)
else: # dd/mm/yyyy
dd = int(a)
mm = int(b)
yyyy = int(c)
# 合法性简单校验
if not (1 <= mm <= 12 and 1 <= dd <= 31 and 1900 <= yyyy <= 2100):
return None
return f"{yyyy:04d}-{mm:02d}-{dd:02d}"
except Exception:
return None
def extract_date_arrived_from_pdf(page_pdf_path: str) -> Optional[str]:
"""从单页PDF中提取到达日期(Date Arrived),返回规范化 `YYYY-MM-DD`
函数说明:
- 优先在包含 "Date Arrived" 字样的附近文本中抓取日期;
- 回退方案1(文本):在整页文本中搜索第一个符合 `dd/mm/yyyy` 或 `yyyy-mm-dd` 的日期;
- 回退方案2(百度OCR):若文本提取未识别到日期,使用百度OCR将PDF页面转图片并识别文本,再按同样规则提取日期;
- 若无法提取则返回 None。
"""
try:
import fitz
if not os.path.isfile(page_pdf_path):
return None
doc = fitz.open(page_pdf_path)
if doc.page_count == 0:
doc.close()
return None
page = doc.load_page(0)
text = page.get_text("text") or ""
doc.close()
txt = text.strip()
# 1) 先定位 "Date Arrived" 片段并提取其后的日期
import re
anchor = re.search(r"Date\s*Arrived[:\s]*([\d/\-.]{8,10})", txt, flags=re.IGNORECASE)
if anchor:
norm = _normalize_date(anchor.group(1))
if norm:
return norm
# 2) 回退(文本):查找第一个日期样式
m = re.search(r"\b(\d{1,2}[\-/\.]\d{1,2}[\-/\.]\d{2,4}|\d{4}[\-/\.]\d{1,2}[\-/\.]\d{1,2})\b", txt)
if m:
norm = _normalize_date(m.group(0))
if norm:
return norm
# 3) 回退(百度OCR):PDF转图片 + OCR识别文本
extractor = _get_baidu_ocr_extractor()
if extractor is not None:
try:
img_bytes = extractor.pdf_page_to_image(page_pdf_path, 0)
if img_bytes:
ocr_txt = extractor.extract_text_from_image(img_bytes, use_accurate=False) or ""
ocr_txt = ocr_txt.strip()
# 3.1 锚点匹配
anchor2 = re.search(r"Date\s*Arrived[:\s]*([\d/\-.]{8,10})", ocr_txt, flags=re.IGNORECASE)
if anchor2:
norm = _normalize_date(anchor2.group(1))
if norm:
return norm
# 3.2 通用日期匹配
m2 = re.search(r"\b(\d{1,2}[\-/\.]\d{1,2}[\-/\.]\d{2,4}|\d{4}[\-/\.]\d{1,2}[\-/\.]\d{1,2})\b", ocr_txt)
if m2:
norm = _normalize_date(m2.group(0))
if norm:
return norm
except Exception:
# OCR回退失败时静默返回None
pass
return None
except Exception:
return None
def _normalize_awb(s: str) -> Optional[str]:
"""规范化提单号为 `xxx-xxxxxxxx` 格式
- 接受 `436-10284956`、`436 10284956`、`43610284956` 等形式
- 返回标准化字符串或 None
"""
if not s:
return None
import re
m = re.match(r"^\s*(\d{3})\D*(\d{7,10})\s*$", s)
if not m:
return None
prefix, number = m.group(1), m.group(2)
# 统一截断到常见8位(若超过)
if len(number) > 8:
number = number[:8]
return f"{prefix}-{number}"
def _find_awbs_in_text(text: str) -> List[str]:
"""从文本中提取 AWB 列表并规范化为唯一集合
- 正则匹配 `3位前缀 + 可选分隔 + 7-10位数字`
- 返回去重后的标准化 AWB 列表
"""
if not text:
return []
import re
patterns = [r"\b(\d{3})[\-\s]?(\d{7,10})\b"]
found = []
for pat in patterns:
for m in re.finditer(pat, text):
norm = _normalize_awb(f"{m.group(1)}-{m.group(2)}")
if norm and norm not in found:
found.append(norm)
return found
def _extract_date_arrived_from_page(pdf_path: str, page_index_zero_based: int) -> Optional[str]:
"""从多页PDF的指定页提取到达日期,返回 `YYYY-MM-DD`
- 先用文本提取;失败时使用百度OCR回退
"""
try:
import fitz, re
if not os.path.isfile(pdf_path):
return None
doc = fitz.open(pdf_path)
if page_index_zero_based >= doc.page_count:
doc.close()
return None
page = doc.load_page(page_index_zero_based)
txt = (page.get_text("text") or "").strip()
doc.close()
anchor = re.search(r"Date\s*Arrived[:\s]*([\d/\-.]{8,10})", txt, flags=re.IGNORECASE)
if anchor:
norm = _normalize_date(anchor.group(1))
if norm:
return norm
m = re.search(r"\b(\d{1,2}[\-/\.]\d{1,2}[\-/\.]\d{2,4}|\d{4}[\-/\.]\d{1,2}[\-/\.]\d{1,2})\b", txt)
if m:
norm = _normalize_date(m.group(0))
if norm:
return norm
# OCR 回退
extractor = _get_baidu_ocr_extractor()
if extractor is not None:
try:
img_bytes = extractor.pdf_page_to_image(pdf_path, page_index_zero_based)
if img_bytes:
ocr_txt = extractor.extract_text_from_image(img_bytes, use_accurate=False) or ""
ocr_txt = ocr_txt.strip()
anchor2 = re.search(r"Date\s*Arrived[:\s]*([\d/\-.]{8,10})", ocr_txt, flags=re.IGNORECASE)
if anchor2:
norm = _normalize_date(anchor2.group(1))
if norm:
return norm
m2 = re.search(r"\b(\d{1,2}[\-/\.]\d{1,2}[\-/\.]\d{2,4}|\d{4}[\-/\.]\d{1,2}[\-/\.]\d{1,2})\b", ocr_txt)
if m2:
norm = _normalize_date(m2.group(0))
if norm:
return norm
except Exception:
pass
return None
except Exception:
return None
def split_and_merge_pod(context7: Dict[str, Any]) -> Dict[str, Any]:
"""POD分拆并按 AWB+到达日期直接合并输出
函数说明:
- 不依赖 `pod_index.csv`;在遍历PDF页面时即时识别 AWB 与到达日期。
- 合并策略:同一 AWB 在同一到达日期仅保留一页;未知日期每个 AWB 仅保留一页。
- 追加策略:若 `release_notes_pdf/{AWB}.pdf` 已存在,则打开旧文件追加页面;否则创建新文档。
传参(context7):
- input_dir: 要处理的目录(默认 `POD`)或 input_files: 明确的PDF列表(优先)
- output_dir: 输出目录(默认项目根 `release_notes_pdf`)
返回:处理统计与生成文件映射。
"""
# 解析参数
base_dir = os.path.dirname(__file__)
project_root = os.path.dirname(os.path.dirname(base_dir))
input_files: List[str] = context7.get("input_files") or []
input_dir = context7.get("input_dir") or base_dir
output_dir = context7.get("output_dir") or os.path.join(project_root, "release_notes_pdf")
os.makedirs(output_dir, exist_ok=True)
logger.info(f"开始POD分拆合并: input_dir={input_dir} | output_dir={output_dir}")
# 收集PDF文件
if not input_files:
try:
for name in os.listdir(input_dir):
if name.lower().endswith(".pdf"):
input_files.append(os.path.join(input_dir, name))
except Exception as e:
logger.debug(f"遍历输入目录失败: {input_dir} | 错误={e}")
pass
# 目标文档缓存:awb -> {doc, path}
awb_docs: Dict[str, Dict[str, Any]] = {}
awb_date_seen: Dict[str, set] = {} # awb -> set(date)
awb_seen_unknown: Dict[str, bool] = {} # awb -> bool
def _get_awb_doc(awb: str) -> Tuple[Any, str]:
"""打开或创建 AWB 合并文档,返回 (doc, path)"""
path = os.path.join(output_dir, f"{awb}.pdf")
if awb in awb_docs:
return awb_docs[awb]["doc"], awb_docs[awb]["path"]
if os.path.exists(path):
try:
doc = fitz.open(path)
except Exception:
doc = fitz.open()
else:
doc = fitz.open()
awb_docs[awb] = {"doc": doc, "path": path}
return doc, path
total_files = 0
total_pages = 0
merged_pages = 0
merged_files: Dict[str, str] = {}
missing_pages: List[Dict[str, Any]] = []
logger.info(f"待处理PDF数量={len(input_files)}")
for pdf_path in input_files:
if not os.path.isfile(pdf_path):
continue
total_files += 1
try:
doc = fitz.open(pdf_path)
page_count = doc.page_count
total_pages += page_count
logger.info(f"处理文件: {os.path.basename(pdf_path)} | 页数={page_count}")
for page_index in range(page_count):
page = doc.load_page(page_index)
text = page.get_text("text") or ""
awbs = _find_awbs_in_text(text)
# OCR 回退识别AWB
if not awbs:
extractor = _get_baidu_ocr_extractor()
if extractor is not None:
try:
img_bytes = extractor.pdf_page_to_image(pdf_path, page_index)
if img_bytes:
ocr_txt = extractor.extract_text_from_image(img_bytes, use_accurate=False) or ""
awbs = _find_awbs_in_text(ocr_txt)
except Exception:
pass
if not awbs:
missing_pages.append({"file": os.path.basename(pdf_path), "page": page_index + 1, "reason": "未识别到AWB"})
logger.debug(f"未识别到AWB: 文件={os.path.basename(pdf_path)} 页={page_index+1}")
continue
# 日期识别(文本/OCR回退)
arrived = _extract_date_arrived_from_page(pdf_path, page_index)
# 逐个AWB进行去重与追加
for awb in awbs:
seen_set = awb_date_seen.setdefault(awb, set())
keep = False
if arrived:
if arrived not in seen_set:
seen_set.add(arrived)
keep = True
else:
flag = awb_seen_unknown.get(awb, False)
if not flag:
awb_seen_unknown[awb] = True
keep = True
if not keep:
continue
# 追加当前页到对应AWB文档
target_doc, target_path = _get_awb_doc(awb)
target_doc.insert_pdf(doc, from_page=page_index, to_page=page_index)
merged_pages += 1
merged_files[awb] = target_path
logger.debug(f"追加页: AWB={awb} 页={page_index+1} 到 {os.path.basename(target_path)}")
doc.close()
except Exception as e:
logger.debug(f"处理文件失败: {pdf_path} | 错误={e}")
continue
# 保存所有AWB文档
for awb, info in awb_docs.items():
try:
temp_dir = os.path.dirname(info["path"]) or output_dir
# 直接保存覆盖(fitz会覆盖现有文件)
info["doc"].save(info["path"])
except Exception:
try:
info["doc"].save(os.path.join(output_dir, f"{awb}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"))
except Exception:
pass
finally:
try:
info["doc"].close()
except Exception:
pass
logger.info(f"分拆合并完成: 文件数={total_files} | 总页数={total_pages} | 合并页数={merged_pages}")
return {
"total_files": total_files,
"total_pages": total_pages,
"total_merged_pages": merged_pages,
"merged_files": merged_files,
"missing_pages": missing_pages,
}
def main(argv: List[str] | None = None) -> int:
"""CLI入口
函数说明:
- 支持通过命令行传入 AWB 与可选路径参数。
- 参数将转换为 context7 后调用主流程。
"""
parser = argparse.ArgumentParser(description="按AWB合并单页PDF,并以AWB命名输出文件")
parser.add_argument("--awb", dest="awb", required=True, help="目标AWB编号,例如 436-123456")
parser.add_argument("--index", dest="index", default=None, help="索引CSV路径,默认 POD/pod_index.csv")
parser.add_argument("--pages-dir", dest="pages_dir", default=None, help="单页PDF目录,默认 POD/pages")
parser.add_argument("--output-dir", dest="output_dir", default=None, help="输出目录,默认 POD")
args = parser.parse_args(argv)
ctx = _build_context7_from_args(args)
result = merge_awb_pages(ctx)
if result.get("output"):
print(f"已生成:{result['output']} | 合并页数={result['merged_pages']}")
else:
print(f"未找到包含AWB的页面:{args.awb}")
return 0
if __name__ == "__main__":
sys.exit(main())
{
"baidu_ocr_app_id": "118782515",
"baidu_ocr_api_key": "gWnGCmjJYzaYwhph8sJEdiRJ",
"baidu_ocr_secret_key": "mjgUUgbxXK8UHcRi5MTlPrb4BWM8NrOu",
"ocr_enabled": true,
"ocr_timeout": 30,
"max_retries": 3
}
\ No newline at end of file
#!/usr/bin/env python3
"""
百度OCR配置文件
用于管理百度OCR API的相关配置参数
"""
import os
import json
from typing import Dict, Optional
class BaiduOCRConfig:
"""百度OCR配置管理类"""
def __init__(self, config_file: str = None):
"""
初始化配置管理器
Args:
config_file: 配置文件路径,默认为当前目录下的baidu_ocr_config.json
"""
if config_file is None:
config_file = os.path.join(os.path.dirname(__file__), 'baidu_ocr_config.json')
self.config_file = config_file
self._config = self._load_config()
def _load_config(self) -> Dict:
"""
从配置文件加载配置
Returns:
配置字典
"""
# 默认配置
default_config = {
"baidu_ocr_app_id": "118782515",
"baidu_ocr_api_key": "gWnGCmjJYzaYwhph8sJEdiRJ",
"baidu_ocr_secret_key": "mjgUUgbxXK8UHcRi5MTlPrb4BWM8NrOu",
"ocr_enabled": True,
"ocr_timeout": 30,
"max_retries": 3
}
# 如果配置文件存在,则加载
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
file_config = json.load(f)
# 合并默认配置和文件配置
default_config.update(file_config)
except (json.JSONDecodeError, IOError) as e:
print(f"警告:无法读取配置文件 {self.config_file}: {e}")
print("使用默认配置")
else:
# 创建默认配置文件
self._save_config(default_config)
return default_config
def _save_config(self, config: Dict) -> None:
"""
保存配置到文件
Args:
config: 配置字典
"""
try:
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=4, ensure_ascii=False)
except IOError as e:
print(f"警告:无法保存配置文件 {self.config_file}: {e}")
def get(self, key: str, default=None):
"""
获取配置值
Args:
key: 配置键
default: 默认值
Returns:
配置值
"""
# 优先从环境变量获取
env_value = os.getenv(key.upper())
if env_value:
return env_value
return self._config.get(key, default)
def set(self, key: str, value) -> None:
"""
设置配置值
Args:
key: 配置键
value: 配置值
"""
self._config[key] = value
self._save_config(self._config)
def get_app_id(self) -> str:
"""获取百度OCR App ID"""
return self.get('baidu_ocr_app_id', '')
def get_api_key(self) -> str:
"""获取百度OCR API Key"""
return self.get('baidu_ocr_api_key', '')
def get_secret_key(self) -> str:
"""获取百度OCR Secret Key"""
return self.get('baidu_ocr_secret_key', '')
def is_ocr_enabled(self) -> bool:
"""检查OCR是否启用"""
return self.get('ocr_enabled', True)
def get_timeout(self) -> int:
"""获取OCR请求超时时间"""
return self.get('ocr_timeout', 30)
def get_max_retries(self) -> int:
"""获取最大重试次数"""
return self.get('max_retries', 3)
def is_configured(self) -> bool:
"""
检查百度OCR是否已正确配置
Returns:
True如果配置完整,False否则
"""
app_id = self.get_app_id()
api_key = self.get_api_key()
secret_key = self.get_secret_key()
return bool(app_id and api_key and secret_key)
def get_all_config(self) -> Dict:
"""获取所有配置"""
return self._config.copy()
# def update_config(self, config_dict: Dict) -> None:
# """
# 批量更新配置
#
# Args:
# config_dict: 配置字典
# """
# self._config.update(config_dict)
# self._save_config(self._config)
def update_config(self, config_dict: Dict, save_to_file: bool = True) -> None:
"""
批量更新配置
Args:
config_dict: 配置字典
save_to_file: 是否将配置持久化写入本地 JSON 文件
"""
self._config.update(config_dict)
if save_to_file:
self._save_config(self._config)
# 全局配置实例
baidu_ocr_config = BaiduOCRConfig()
def get_baidu_ocr_config() -> BaiduOCRConfig:
"""
获取百度OCR配置实例
Returns:
BaiduOCRConfig实例
"""
return baidu_ocr_config
def check_baidu_ocr_config() -> bool:
"""
检查百度OCR配置是否可用
Returns:
True如果配置可用,False否则
"""
try:
config = get_baidu_ocr_config()
# 检查基本配置
if not config.is_configured():
return False
# 检查是否启用OCR
if not config.is_ocr_enabled():
return False
# 尝试导入百度OCR SDK
try:
from aip import AipOcr
# 尝试初始化客户端
client = AipOcr(
config.get_app_id(),
config.get_api_key(),
config.get_secret_key()
)
# 如果能成功创建客户端,认为配置可用
return True
except ImportError as e:
import sys
print(f"警告: 百度OCR SDK (baidu-aip) 导入失败: {e}", file=sys.stderr)
print("请运行: pip install baidu-aip", file=sys.stderr)
# 如果是在Docker中,提示重建
if os.path.exists('/.dockerenv'):
print("提示: 检测到Docker环境,请尝试重新构建镜像: docker build --no-cache -t david-customs-data .", file=sys.stderr)
return False
except Exception as e:
print(f"警告: 百度OCR客户端初始化失败: {e}")
return False
except Exception as e:
print(f"警告: 百度OCR配置检查失败: {e}")
return False
def get_config_status() -> Dict:
"""
获取详细的配置状态信息
Returns:
配置状态字典
"""
config = get_baidu_ocr_config()
status = {
'app_id_configured': bool(config.get_app_id()),
'api_key_configured': bool(config.get_api_key()),
'secret_key_configured': bool(config.get_secret_key()),
'ocr_enabled': config.is_ocr_enabled(),
'timeout': config.get_timeout(),
'max_retries': config.get_max_retries(),
'fully_configured': config.is_configured()
}
# 添加部分配置信息(隐藏敏感信息)
if status['app_id_configured']:
app_id = config.get_app_id()
status['app_id_preview'] = app_id[:8] + '...' if len(app_id) > 8 else app_id
if status['api_key_configured']:
api_key = config.get_api_key()
status['api_key_preview'] = api_key[:8] + '...' if len(api_key) > 8 else api_key
if status['secret_key_configured']:
secret_key = config.get_secret_key()
status['secret_key_preview'] = secret_key[:8] + '...' if len(secret_key) > 8 else secret_key
# 检查SDK可用性
try:
from aip import AipOcr
status['sdk_available'] = True
except ImportError:
status['sdk_available'] = False
return status
if __name__ == "__main__":
# 测试配置
config = get_baidu_ocr_config()
print("百度OCR配置测试:")
print(f"App ID: {config.get_app_id()}")
print(f"API Key: {config.get_api_key()}")
print(f"Secret Key: {config.get_secret_key()}")
print(f"OCR启用: {config.is_ocr_enabled()}")
print(f"超时时间: {config.get_timeout()}秒")
print(f"最大重试: {config.get_max_retries()}次")
print(f"配置完整: {config.is_configured()}")
\ No newline at end of file
"""
百度OCR提取器 - 使用百度OCR API进行文字识别和提单号提取
"""
import re
import io
import hashlib
import logging
from typing import List, Dict, Optional, Tuple
from PIL import Image, ImageEnhance
import fitz # PyMuPDF
# 导入新的配置系统
from .baidu_ocr_config import get_baidu_ocr_config, check_baidu_ocr_config
# 兼容旧的配置系统
try:
from config import OCR_OPTIONS, PDF_TO_IMAGE_CONFIG, BILL_NUMBER_CONFIG, CACHE_CONFIG
except ImportError:
# 如果旧配置不存在,使用默认值
OCR_OPTIONS = {"language_type": "CHN_ENG", "detect_direction": "true", "probability": "false"}
# PDF_TO_IMAGE_CONFIG = {"dpi": 200, "format": "PNG"}
PDF_TO_IMAGE_CONFIG = {"dpi": 200, "format": "PNG", "scale_factor": 2.0}
BILL_NUMBER_CONFIG = {"patterns": [r'\b\d{3}-\d{8,10}\b', r'\b\d{3}-\d{7,9}\b'], "min_length": 10, "max_length": 15}
CACHE_CONFIG = {"max_size": 1000}
class OCRCache:
"""OCR结果缓存类"""
def __init__(self, max_size: int = 1000):
self.cache = {}
self.max_size = max_size
self.access_count = {}
def get_cache_key(self, image_data: bytes) -> str:
"""生成图片缓存键"""
return hashlib.md5(image_data).hexdigest()
def get_cached_result(self, image_data: bytes) -> Optional[str]:
"""获取缓存结果"""
key = self.get_cache_key(image_data)
if key in self.cache:
self.access_count[key] = self.access_count.get(key, 0) + 1
return self.cache[key]
return None
def cache_result(self, image_data: bytes, result: str):
"""缓存结果"""
if len(self.cache) >= self.max_size:
# 删除最少使用的缓存项
least_used_key = min(self.access_count.keys(), key=lambda k: self.access_count[k])
del self.cache[least_used_key]
del self.access_count[least_used_key]
key = self.get_cache_key(image_data)
self.cache[key] = result
self.access_count[key] = 1
class OCRMonitor:
"""OCR调用监控类"""
def __init__(self):
self.call_count = 0
self.success_count = 0
self.error_count = 0
self.cache_hit_count = 0
self.errors = []
def record_call(self, success: bool = True, error_msg: str = None, from_cache: bool = False):
"""记录调用"""
if from_cache:
self.cache_hit_count += 1
return
self.call_count += 1
if success:
self.success_count += 1
else:
self.error_count += 1
if error_msg:
self.errors.append(error_msg)
def get_stats(self) -> Dict:
"""获取统计信息"""
total_requests = self.call_count + self.cache_hit_count
return {
'total_requests': total_requests,
'api_calls': self.call_count,
'cache_hits': self.cache_hit_count,
'cache_hit_rate': self.cache_hit_count / total_requests if total_requests > 0 else 0,
'success_rate': self.success_count / self.call_count if self.call_count > 0 else 0,
'error_rate': self.error_count / self.call_count if self.call_count > 0 else 0,
'recent_errors': self.errors[-5:] # 最近5个错误
}
class BaiduOCRExtractor:
"""百度OCR提取器"""
def __init__(self, config: Dict = None):
"""初始化OCR提取器
函数说明:
- 采用统一配置检查 `check_baidu_ocr_config()` 判断可用性;
- 懒加载百度 AIP SDK,避免模块级导入导致环境不一致的告警;
- 初始化缓存与监控,用于提升性能与稳定性。
"""
# 使用新的配置系统
self.ocr_config = get_baidu_ocr_config()
self.config = config or {}
self.client = None
# 初始化缓存
cache_size = CACHE_CONFIG.get('max_size', 1000) if isinstance(CACHE_CONFIG, dict) else 1000
self.cache = OCRCache(max_size=cache_size)
self.monitor = OCRMonitor()
# 设置日志
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
# 初始化客户端(懒加载 SDK,统一配置检测)
self._init_client()
# 提单号正则表达式模式
self.bill_patterns = [
r'436[-\s]*(\d{6,8})', # 标准格式:436-xxxxxxxx
r'(\d{3}[-\s]*\d{6,8})', # 通用格式:xxx-xxxxxxxx
r'436\s*(\d{6,8})', # 无分隔符:436xxxxxxxx
]
def _init_client(self):
"""初始化百度OCR客户端
函数说明:
- 通过 `check_baidu_ocr_config()` 统一判断 SDK 与凭据是否可用;
- 在函数内进行 `from aip import AipOcr` 懒加载,避免跨环境误报;
- 初始化失败时不抛异常,仅记录日志,保证调用方可检测可用性。
"""
try:
if not check_baidu_ocr_config():
self.logger.warning("百度OCR配置或SDK不可用")
self.client = None
return
# 懒加载 AIP SDK(避免模块级导入带来的环境差异)
from aip import AipOcr # type: ignore
# 使用配置系统获取参数
app_id = self.ocr_config.get_app_id()
api_key = self.ocr_config.get_api_key()
secret_key = self.ocr_config.get_secret_key()
self.client = AipOcr(app_id, api_key, secret_key)
self.logger.info("百度OCR客户端初始化成功,client=%s", self.client)
except ImportError:
self.logger.error("未检测到 baidu-aip SDK,请先安装: python3 -m pip install baidu-aip")
self.client = None
except Exception as e:
self.logger.error(f"百度OCR客户端初始化失败: {e}")
self.client = None
def is_available(self) -> bool:
"""检查OCR是否可用
函数说明:
- 仅以客户端是否初始化成功为准;
- 不再依赖模块级全局标志,杜绝环境不一致造成的误判。
"""
return self.client is not None
def preprocess_image(self, image_data: bytes) -> bytes:
"""图片预处理提高OCR准确率"""
try:
# 转换为PIL图像
image = Image.open(io.BytesIO(image_data))
# 转换为灰度图
if image.mode != 'L':
image = image.convert('L')
# 增强对比度
enhancer = ImageEnhance.Contrast(image)
image = enhancer.enhance(1.5)
# 增强锐度
enhancer = ImageEnhance.Sharpness(image)
image = enhancer.enhance(1.2)
# 转换回字节数据
output = io.BytesIO()
image.save(output, format='PNG')
return output.getvalue()
except Exception as e:
self.logger.warning(f"图片预处理失败: {e}")
return image_data
def extract_text_from_image(self, image_data: bytes, use_accurate: bool = False) -> str:
"""从图片提取文字
函数说明:
- 预处理并(必要时)压缩图片后调用百度OCR识别。
- 当返回中没有 `words_result` 时,记录详细错误信息(error_code、error_msg)。
- 若结果为空并且未启用精确识别,则自动尝试 `accurateGeneral` 重试一次。
- 对超大图片(>4MB)进行JPEG压缩后重试,避免因图片过大导致的空结果。
返回:识别出的全文字符串,失败时返回空字符串。
"""
if not self.is_available():
return ""
# 检查缓存
if self.cache:
cached_result = self.cache.get_cached_result(image_data)
if cached_result is not None:
self.monitor.record_call(success=True, from_cache=True)
return cached_result
# 预处理图片
processed_image = self.preprocess_image(image_data)
original_size = len(image_data) if image_data else 0
processed_size = len(processed_image) if processed_image else 0
# 根据大小决定是否压缩(百度OCR单次图片建议 < 4MB)
payload = self._compress_image_for_ocr(processed_image)
payload_size = len(payload) if payload else 0
try:
# 选择OCR方法
if use_accurate:
# 修复SDK方法名:使用 accurateBasic(高精度通用文字识别)
result = self.client.accurateBasic(payload, OCR_OPTIONS)
else:
result = self.client.basicGeneral(payload, OCR_OPTIONS)
# 解析结果
text = self._parse_ocr_result(result)
if not text:
# 当无 words_result 时,记录错误细节帮助定位
error_code = result.get('error_code') if isinstance(result, dict) else None
error_msg = result.get('error_msg') if isinstance(result, dict) else None
self.logger.warning(
f"OCR返回空结果: error_code={error_code} error_msg={error_msg} "
f"size(original/processed/payload)={original_size}/{processed_size}/{payload_size}"
)
# 若未使用精确识别,则尝试精确识别重试一次
if not use_accurate:
try:
# 空结果时自动切换为高精度识别
retry_result = self.client.accurateBasic(payload, OCR_OPTIONS)
text = self._parse_ocr_result(retry_result)
if not text:
retry_code = retry_result.get('error_code') if isinstance(retry_result, dict) else None
retry_msg = retry_result.get('error_msg') if isinstance(retry_result, dict) else None
self.logger.warning(
f"accurateGeneral重试仍为空: error_code={retry_code} error_msg={retry_msg} "
f"payload_size={payload_size}"
)
except Exception as re:
self.logger.error(f"accurateGeneral重试异常: {re}")
# 缓存结果
if self.cache:
self.cache.cache_result(image_data, text)
self.monitor.record_call(success=True)
return text
except Exception as e:
error_msg = f"OCR识别失败: {e}"
self.logger.error(error_msg)
self.monitor.record_call(success=False, error_msg=str(e))
return ""
def _parse_ocr_result(self, result: Dict) -> str:
"""解析OCR结果
函数说明:
- 从百度OCR返回的JSON结构中提取 `words_result` 文本行。
- 若结构不包含 `words_result` 或列表为空,则返回空字符串。
返回:拼接后的全文字符串。
"""
if 'words_result' not in result:
return ""
text_lines = []
for item in result['words_result']:
if 'words' in item:
text_lines.append(item['words'])
return '\n'.join(text_lines)
def _compress_image_for_ocr(self, image_data: bytes, max_bytes: int = 4_000_000, max_dim: int = 4096) -> bytes:
"""为OCR压缩并下采样图片,满足大小与像素限制
函数说明:
- 若图片字节数超过 `max_bytes` 或宽/高超过 `max_dim`,执行下采样与JPEG压缩。
- 先按最长边限制到 `max_dim`(保持纵横比),再逐步降低质量直到≤`max_bytes`。
- 质量从90降至60,尽量在确保清晰度前提下减小体积。
返回:压缩后的图片字节数据(或原始数据)。
"""
try:
if not image_data:
return image_data
img = Image.open(io.BytesIO(image_data))
# 转为RGB以支持JPEG
if img.mode != 'RGB':
img = img.convert('RGB')
# 限制像素尺寸(最长边不超过max_dim)
w, h = img.size
longest = max(w, h)
if longest > max_dim:
ratio = max_dim / float(longest)
new_size = (int(w * ratio), int(h * ratio))
img = img.resize(new_size, Image.LANCZOS)
w, h = img.size
# 如果当前大小已在限制内,仍进行轻量JPEG重打包以提升兼容性
if len(image_data) <= max_bytes and max(w, h) <= max_dim:
buf = io.BytesIO()
img.save(buf, format='JPEG', quality=90, optimize=True)
data = buf.getvalue()
return data
# 逐步降低质量直到满足大小限制
for quality in (90, 85, 80, 75, 70, 65, 60):
buf = io.BytesIO()
img.save(buf, format='JPEG', quality=quality, optimize=True)
data = buf.getvalue()
if len(data) <= max_bytes:
return data
# 最后兜底返回最低质量结果
return data
except Exception as e:
self.logger.warning(f"图片压缩失败,使用原始数据: {e}")
return image_data
def pdf_page_to_image(self, pdf_path: str, page_num: int) -> Optional[bytes]:
"""将PDF页面转换为图片"""
try:
doc = fitz.open(pdf_path)
if page_num >= doc.page_count:
self.logger.warning(f"页面号 {page_num} 超出范围,PDF共 {doc.page_count} 页")
doc.close()
return None
page = doc.load_page(page_num)
# 设置缩放比例提高图片质量
mat = fitz.Matrix(PDF_TO_IMAGE_CONFIG['scale_factor'], PDF_TO_IMAGE_CONFIG['scale_factor'])
pix = page.get_pixmap(matrix=mat)
# 转换为字节数据
img_data = pix.tobytes(PDF_TO_IMAGE_CONFIG['format'].lower())
doc.close()
return img_data
except Exception as e:
self.logger.error(f"PDF页面转图片失败: {e}")
return None
def find_bill_numbers_in_text(self, text: str) -> List[str]:
"""在文本中查找提单号"""
bill_numbers = set()
for pattern in self.bill_patterns:
matches = re.finditer(pattern, text, re.IGNORECASE)
for match in matches:
if len(match.groups()) > 0:
# 提取数字部分
number_part = match.group(1)
# 构造完整提单号
bill_number = f"436-{number_part}"
else:
# 完整匹配
bill_number = match.group(0)
# 标准化格式
bill_number = re.sub(r'[-\s]+', '-', bill_number)
if not bill_number.startswith('436'):
continue
# 验证提单号
if self._validate_bill_number(bill_number):
bill_numbers.add(bill_number)
return list(bill_numbers)
def _validate_bill_number(self, bill_number: str) -> bool:
"""验证提单号格式"""
# 移除分隔符进行验证
clean_number = re.sub(r'[-\s]', '', bill_number)
# 检查长度
if len(clean_number) < BILL_NUMBER_CONFIG['min_length'] or len(clean_number) > BILL_NUMBER_CONFIG['max_length']:
return False
# 检查前缀
if not clean_number.startswith(BILL_NUMBER_CONFIG['prefix']):
return False
# 检查数字部分长度
number_part = clean_number[3:] # 去掉436前缀
if len(number_part) < BILL_NUMBER_CONFIG['required_digits']:
return False
# 检查是否全为数字
if not number_part.isdigit():
return False
return True
def extract_bills_from_pdf_page(self, pdf_path: str, page_num: int, use_accurate: bool = False) -> List[str]:
"""从PDF页面提取提单号"""
# 转换页面为图片
image_data = self.pdf_page_to_image(pdf_path, page_num)
if not image_data:
return []
# OCR识别文字
text = self.extract_text_from_image(image_data, use_accurate)
if not text:
return []
# 提取提单号
bill_numbers = self.find_bill_numbers_in_text(text)
self.logger.info(f"页面 {page_num} OCR提取到 {len(bill_numbers)} 个提单号")
return bill_numbers
def extract_bills_from_pdf(self, pdf_path: str, max_pages: int = None) -> Tuple[List[str], Dict]:
"""从整个PDF提取提单号"""
all_bills = set()
page_results = {}
try:
doc = fitz.open(pdf_path)
total_pages = doc.page_count
doc.close()
# 限制处理页数
if max_pages:
total_pages = min(total_pages, max_pages)
self.logger.info(f"开始OCR处理PDF,共 {total_pages} 页")
for page_num in range(total_pages):
bills = self.extract_bills_from_pdf_page(pdf_path, page_num)
page_results[page_num] = bills
all_bills.update(bills)
# 记录进度
if (page_num + 1) % 5 == 0:
self.logger.info(f"已处理 {page_num + 1}/{total_pages} 页")
final_bills = list(all_bills)
# 生成统计信息
stats = {
'total_pages': total_pages,
'total_bills': len(final_bills),
'page_results': page_results,
'ocr_stats': self.monitor.get_stats()
}
self.logger.info(f"OCR提取完成,共找到 {len(final_bills)} 个唯一提单号")
return final_bills, stats
except Exception as e:
self.logger.error(f"PDF OCR提取失败: {e}")
return [], {'error': str(e)}
def get_monitor_stats(self) -> Dict:
"""获取监控统计信息"""
return self.monitor.get_stats()
def test_ocr_extractor():
"""测试OCR提取器"""
extractor = BaiduOCRExtractor()
print("OCR提取器测试")
print(f"OCR可用性: {extractor.is_available()}")
if extractor.is_available():
# 测试文本提取
test_text = "提单号:436-12345678\n另一个提单号:436 87654321"
bills = extractor.find_bill_numbers_in_text(test_text)
print(f"测试文本提取结果: {bills}")
# 显示统计信息
stats = extractor.get_monitor_stats()
print(f"统计信息: {stats}")
if __name__ == "__main__":
test_ocr_extractor()
"""
POD PDF 索引器
----------------
功能:
- 扫描 POD 目录下的所有 PDF 文件;
- 按页提取文本并识别 AWB 编号(如 436-XXXXXXXX 或 3位前缀+8位序列);
- 生成两个索引 CSV:
1) pod_index.csv:逐页索引(文件名、页码、AWB列表);
2) pod_awb_summary.csv:按文件汇总 AWB(文件名、AWB列表、AWB总数、页数)。
使用:
- 直接运行:python POD/pod_indexer.py
- 指定目录与输出:python POD/pod_indexer.py --dir POD --index pod_index.csv --summary pod_awb_summary.csv
说明:
- 优先支持 context7 传参:可通过调用 index_pod_directory(context7={...}) 传入参数;
- 文本提取使用 PyMuPDF(fitz);若扫描件为纯图像且无 OCR,这里不会做 OCR 回退,只记录空文本页。
"""
from __future__ import annotations
import os
import re
import csv
import sys
import argparse
import logging
from dataclasses import dataclass
from typing import List, Dict, Any, Tuple, Optional
import uuid
try:
import fitz # PyMuPDF
except ImportError as e:
raise RuntimeError("需要安装 PyMuPDF (fitz)。请在环境中安装:pip install PyMuPDF")
# 修正运行路径以便导入项目根目录下的OCR模块
_repo_root = os.path.dirname(os.path.dirname(__file__))
if _repo_root and _repo_root not in sys.path:
sys.path.insert(0, _repo_root)
# 模块级日志器(与 release notes 分拆风格一致)
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# ========================= 日期识别与规范化 =========================
def _normalize_date(date_str: str) -> Optional[str]:
"""规范化日期为 `YYYY-MM-DD`
函数说明:
- 支持 `dd/mm/yyyy`、`dd-mm-yyyy`、`yyyy/mm/dd`、`yyyy-mm-dd` 等常见格式。
- 解析失败返回 None。
"""
if not date_str:
return None
s = date_str.strip().replace('.', '/').replace('-', '/')
parts = s.split('/')
if len(parts) != 3:
return None
a, b, c = parts
try:
# 判断是否为 yyyy/mm/dd
if len(a) == 4:
yyyy = int(a); mm = int(b); dd = int(c)
else:
dd = int(a); mm = int(b); yyyy = int(c)
if not (1 <= mm <= 12 and 1 <= dd <= 31 and 1900 <= yyyy <= 2100):
return None
return f"{yyyy:04d}-{mm:02d}-{dd:02d}"
except Exception:
return None
def _extract_date_from_text(text: str) -> Optional[str]:
"""从给定文本中提取到达日期并规范化
函数说明:
- 优先匹配 "Date Arrived" 锚点后的日期;
- 其次匹配全文中的第一个日期样式;
- 返回规范化结果或 None。
"""
if not text:
return None
txt = text.strip()
import re
anchor = re.search(r"Date\s*Arrived[:\s]*([\d/\-.]{8,10})", txt, flags=re.IGNORECASE)
if anchor:
norm = _normalize_date(anchor.group(1))
if norm:
return norm
m = re.search(r"\b(\d{1,2}[\-/\.]\d{1,2}[\-/\.]\d{2,4}|\d{4}[\-/\.]\d{1,2}[\-/\.]\d{1,2})\b", txt)
if m:
norm = _normalize_date(m.group(0))
if norm:
return norm
return None
def _extract_date_arrived_for_single_page(page_pdf_path: str, extractor=None) -> Optional[str]:
"""从单页PDF提取到达日期(文本优先,OCR回退)
函数说明:
- 先用 PyMuPDF 提取整页文本并尝试解析日期;
- 若未识别,且提供了 OCR 提取器且可用,则转图片进行 OCR 识别再解析日期;
- 返回规范化 `YYYY-MM-DD` 或 None。
"""
try:
doc = fitz.open(page_pdf_path)
if doc.page_count == 0:
doc.close(); return None
page = doc.load_page(0)
text = page.get_text("text") or ""
doc.close()
norm = _extract_date_from_text(text)
if norm:
return norm
# OCR 回退
if extractor and getattr(extractor, "is_available", lambda: False)():
try:
img_bytes = extractor.pdf_page_to_image(page_pdf_path, 0)
if img_bytes:
ocr_text = extractor.extract_text_from_image(img_bytes, use_accurate=False) or ""
return _extract_date_from_text(ocr_text)
except Exception:
pass
return None
except Exception:
return None
# ========================= 数据结构定义 =========================
@dataclass
class PageIndex:
"""单页索引数据结构
- file: PDF 文件名(不含路径)
- page: 页码(从 1 开始)
- index_id: 唯一索引ID(同名保存的单页PDF文件名)
- page_pdf: 单页PDF保存的绝对路径(可为空,取决于是否保存)
- awbs: 该页识别到的 AWB 列表(去重、规范化)
"""
file: str
page: int
index_id: str
page_pdf: Optional[str]
awbs: List[str]
@dataclass
class FileSummary:
"""文件级汇总数据结构
- file: PDF 文件名(不含路径)
- awbs: 文件内识别到的全部 AWB 列表(去重、规范化)
- total_awbs: AWB 总数
- total_pages: 页数
"""
file: str
awbs: List[str]
total_awbs: int
total_pages: int
# ========================= 工具函数 =========================
def compute_index_id(file_name: str, page_no: int) -> str:
"""
生成稳定的索引ID,基于“原始文件名+页码”哈希,保证每次运行一致。
- 输入: 原始文件名、页码(从1开始)
- 输出: 稳定的16进制字符串ID
"""
import hashlib
key = f"{file_name}:{page_no}".encode("utf-8")
return hashlib.md5(key).hexdigest()
# ========================= AWB 识别函数 =========================
AWB_PATTERNS = [
# 标准格式:三位前缀 + 连字符 + 八位序列(常见如 436-12345678)
re.compile(r"\b(?P<prefix>\d{3})[-\s]?(?P<number>\d{8})\b"),
# 有时可能出现多于8位或不带连字符,这里做宽松匹配后再规范化
re.compile(r"\b(?P<prefix>\d{3})[-\s]?(?P<number>\d{7,10})\b")
]
def normalize_awb(prefix: str, number: str) -> str:
"""
规范化 AWB 编号为统一格式:XXX-XXXXXXXX
- 输入: 前缀与数字部分
- 输出: 形如 '436-12345678'
"""
prefix = re.sub(r"\D", "", prefix or "")
number = re.sub(r"\D", "", number or "")
if not prefix or not number:
return ""
# 优先使用前 8 位作为序列号(标准 AWB 长度)
if len(number) >= 8:
number = number[:8]
return f"{prefix}-{number}"
def find_awb_numbers_in_text(text: str) -> List[str]:
"""
从文本中提取 AWB 编号列表
- 输入: 任意文本(可能包含多个 AWB 编号)
- 输出: 去重后的 AWB 列表(规范化为 XXX-XXXXXXXX)
"""
if not text:
return []
found: List[str] = []
seen = set()
for pattern in AWB_PATTERNS:
for m in pattern.finditer(text):
awb = normalize_awb(m.group("prefix"), m.group("number"))
if awb and awb not in seen:
seen.add(awb)
found.append(awb)
return found
# ========================= PDF 索引实现 =========================
def index_pdf_file(pdf_path: str, save_pages: bool = False, pages_dir: Optional[str] = None,
use_baidu_ocr: bool = True) -> Tuple[List[PageIndex], FileSummary]:
"""
对单个 PDF 文件进行索引
- 输入: PDF 文件绝对路径
- save_pages: 是否按页另存为单页 PDF
- pages_dir: 单页 PDF 输出目录(当 save_pages=True 时生效)
- use_baidu_ocr: 当页面无文本或未识别到AWB时,是否启用百度OCR回退识别
- 输出: (逐页索引列表, 文件汇总)
"""
if not os.path.exists(pdf_path):
raise FileNotFoundError(f"PDF 文件不存在: {pdf_path}")
file_name = os.path.basename(pdf_path)
doc = fitz.open(pdf_path)
page_indices: List[PageIndex] = []
all_awbs_set = set()
# 延迟加载百度OCR,以避免未安装SDK时报错
extractor = None
if use_baidu_ocr:
try:
from .baidu_ocr_extractor import BaiduOCRExtractor # 延迟导入
extractor = BaiduOCRExtractor()
except Exception:
extractor = None
for page_index in range(doc.page_count):
page = doc.load_page(page_index)
text = page.get_text("text") or ""
awbs = find_awb_numbers_in_text(text)
# 提取到达日期(文本优先)
arrived = _extract_date_from_text(text)
# 若无AWB且允许OCR,尝试OCR回退
if use_baidu_ocr and extractor and hasattr(extractor, "is_available") and extractor.is_available():
if not awbs:
try:
img_bytes = extractor.pdf_page_to_image(pdf_path, page_index)
if img_bytes:
ocr_text = extractor.extract_text_from_image(img_bytes, use_accurate=False)
awbs = find_awb_numbers_in_text(ocr_text)
# OCR 回退同时尝试提取日期
if not arrived:
arrived = _extract_date_from_text(ocr_text)
except Exception:
pass
# 去重规范化
unique_awbs = []
seen = set()
for a in awbs:
if a not in seen:
seen.add(a)
unique_awbs.append(a)
# 页级日志:记录识别到的AWB与到达日期(若无则用'-'占位)
logger.info(
f"[页扫描] 文件={file_name} 页={page_index + 1} AWB={', '.join(unique_awbs) if unique_awbs else '-'} 日期={arrived or '-'}"
)
# 生成稳定索引ID(原始文件名 + 页码)
index_id = compute_index_id(file_name, page_index + 1)
# 需要另存单页PDF
page_pdf_path: Optional[str] = None
if save_pages:
# 计算输出目录
out_dir = pages_dir or os.path.join(os.path.dirname(pdf_path), "pages")
os.makedirs(out_dir, exist_ok=True)
page_pdf_path = os.path.join(out_dir, f"{index_id}.pdf")
try:
single_doc = fitz.open()
single_doc.insert_pdf(doc, from_page=page_index, to_page=page_index)
# 倒置页面自动旋转纠正
try:
p = single_doc[0]
if (p.rotation % 360) == 180:
p.set_rotation((p.rotation + 180) % 360)
except Exception:
pass
single_doc.save(page_pdf_path)
single_doc.close()
except Exception:
page_pdf_path = None
page_indices.append(PageIndex(
file=file_name,
page=page_index + 1,
index_id=index_id,
page_pdf=page_pdf_path,
awbs=unique_awbs,
))
all_awbs_set.update(unique_awbs)
summary = FileSummary(
file=file_name,
awbs=sorted(all_awbs_set),
total_awbs=len(all_awbs_set),
total_pages=doc.page_count,
)
doc.close()
return page_indices, summary
def split_pdf_pages(pdf_path: str, pages_dir: str) -> List[PageIndex]:
"""将指定PDF拆分为单页PDF,并尽量最小化体积
函数说明:
- 为每页生成稳定索引ID(基于原始文件名+页码),避免重复生成。
- 保存单页PDF时启用压缩选项:`garbage=3`、`deflate=True`、`clean=True`,在不降低精度的前提下尽量减小文件大小。
- 如发现页面为倒置(旋转180度),在保存前纠正。
输入:
- `pdf_path` 原始PDF绝对路径
- `pages_dir` 单页PDF输出目录
返回:
- `List[PageIndex]` 列表(awbs 为空,待后续OCR补充)
"""
file_name = os.path.basename(pdf_path)
os.makedirs(pages_dir, exist_ok=True)
doc = fitz.open(pdf_path)
results: List[PageIndex] = []
for page_index in range(doc.page_count):
# 使用稳定ID,确保同一“原始文件名+页码”始终对应同一个索引ID
index_id = compute_index_id(file_name, page_index + 1)
page_pdf_path = os.path.join(pages_dir, f"{index_id}.pdf")
try:
single_doc = fitz.open()
single_doc.insert_pdf(doc, from_page=page_index, to_page=page_index)
# 若页面方向是倒的(旋转180度),则纠正为正向
try:
p = single_doc[0]
if (p.rotation % 360) == 180:
p.set_rotation((p.rotation + 180) % 360)
except Exception:
pass
# 保存时开启压缩与清理,尽量减小体积但保持原始精度
# 说明:garbage=3 清理未使用对象;deflate=True 压缩流;clean=True 重建xref结构。
# 部分PyMuPDF版本支持 linear=True(网页优化),可选但非必须。
try:
single_doc.save(page_pdf_path, garbage=3, deflate=True, clean=True)
except TypeError:
# 兼容旧版本PyMuPDF不支持clean参数的情况
single_doc.save(page_pdf_path, garbage=3, deflate=True)
single_doc.close()
except Exception:
page_pdf_path = None
results.append(PageIndex(
file=file_name,
page=page_index + 1,
index_id=index_id,
page_pdf=page_pdf_path,
awbs=[],
))
doc.close()
return results
def _ocr_awb_for_single_page(extractor, page_pdf_path: str, log_text: bool = False) -> List[str]:
"""
使用百度OCR对单页PDF进行识别并提取AWB编号。
- 输入: extractor 百度OCR提取器实例, page_pdf_path 单页PDF路径, log_text 是否将OCR全文输出到日志
- 输出: 识别出的 AWB 列表(去重、规范化)
说明:
1) 为避免强制 436 前缀,复用 BaiduOCRExtractor 的“PDF转图片 + 图片OCR”能力,再用通用 AWB 正则 `find_awb_numbers_in_text` 解析文本。
2) 当 log_text=True 时,将当前页面的 OCR 识别全文打印到日志,便于调试优化(包含全部识别内容,不做截断)。
"""
if not extractor or not getattr(extractor, "is_available", lambda: False)():
if log_text:
logger.info(f"[OCR跳过] 提取器不可用或未初始化 文件={os.path.basename(page_pdf_path)}")
return []
try:
# 单页PDF的页码为0;复用提取器将PDF页面转换为图片并执行OCR
img_bytes = extractor.pdf_page_to_image(page_pdf_path, 0)
if not img_bytes:
if log_text:
logger.info(f"[OCR失败] PDF转图片失败 文件={os.path.basename(page_pdf_path)}")
return []
ocr_text = extractor.extract_text_from_image(img_bytes, use_accurate=False)
if log_text:
# 调试日志:输出 OCR 全文内容,帮助观察识别质量与版式影响
logger.debug(f"[OCR全文] 文件={os.path.basename(page_pdf_path)}\n{ocr_text}")
# 使用通用AWB解析逻辑,不强制任何前缀
awbs = find_awb_numbers_in_text(ocr_text)
# 去重(find_awb_numbers_in_text 已返回规范化格式)
normalized: List[str] = []
seen = set()
for a in awbs or []:
if a and a not in seen:
seen.add(a)
normalized.append(a)
if log_text:
logger.info(f"[AWB解析] 文件={os.path.basename(page_pdf_path)} 数量={len(normalized)} 值={', '.join(normalized) if normalized else ''}")
return normalized
except Exception as e:
if log_text:
logger.info(f"[OCR异常] 文件={os.path.basename(page_pdf_path)} 错误={e}")
return []
def index_pod_directory(context7: Dict[str, Any]) -> Dict[str, Any]:
"""
扫描 POD 目录并生成索引(优先使用 context7 传参)
- 输入: context7 字典,支持键:
- dir_path: 目录路径(默认 'POD')
- output_index_csv: 页级索引 CSV 文件名(默认 'pod_index.csv')
- output_summary_csv: 文件汇总 CSV 文件名(默认 'pod_awb_summary.csv')
- save_pages: 是否另存单页PDF(默认 True)
- page_output_dir: 单页PDF输出目录(默认 POD/pages)
- pipeline_split_first: 是否先拆分再OCR(默认 True)
- 输出: 处理结果字典,包含索引与统计
"""
dir_path = context7.get("dir_path") or os.path.join(os.path.dirname(__file__), "")
# 如果传入的是仓库根目录,默认子目录 POD
if os.path.isdir(dir_path) and os.path.basename(dir_path) != "POD":
candidate = os.path.join(dir_path, "POD")
if os.path.isdir(candidate):
dir_path = candidate
if not os.path.isdir(dir_path):
# 允许直接使用当前文件所在目录的 POD
candidate = os.path.join(os.path.dirname(__file__), "")
if os.path.isdir(candidate):
dir_path = candidate
output_index_csv = context7.get("output_index_csv") or os.path.join(dir_path, "pod_index.csv")
output_summary_csv = context7.get("output_summary_csv") or os.path.join(dir_path, "pod_awb_summary.csv")
save_pages = True if context7.get("save_pages") is None else bool(context7.get("save_pages"))
page_output_dir = context7.get("page_output_dir") or os.path.join(dir_path, "pages")
pipeline_split_first = True if context7.get("pipeline_split_first") is None else bool(context7.get("pipeline_split_first"))
log_ocr_text = bool(context7.get("log_ocr_text"))
logger.info(f"开始索引 POD 目录: {dir_path} | 拆分优先={pipeline_split_first}")
pdf_files = [
os.path.join(dir_path, f)
for f in os.listdir(dir_path)
if f.lower().endswith(".pdf") and os.path.isfile(os.path.join(dir_path, f))
]
logger.info(f"找到 {len(pdf_files)} 个PDF 文件")
all_page_indices: List[PageIndex] = []
all_summaries: List[FileSummary] = []
if pipeline_split_first:
# 先拆分所有PDF到单页
split_pages: List[PageIndex] = []
for pdf_path in sorted(pdf_files):
try:
per_file_pages = split_pdf_pages(pdf_path, page_output_dir)
split_pages.extend(per_file_pages)
logger.info(f"拆分完成: {os.path.basename(pdf_path)} | 生成 {len(per_file_pages)} 页")
except Exception as e:
logger.info(f"拆分失败: {pdf_path}: {e}")
# 再进行OCR识别AWB
extractor = None
bae_mod = None
try:
from . import baidu_ocr_extractor as bae_mod
from .baidu_ocr_extractor import BaiduOCRExtractor
extractor = BaiduOCRExtractor()
except Exception:
extractor = None
bae_mod = None
if log_ocr_text:
try:
available = bool(extractor and getattr(extractor, "is_available", lambda: False)())
except Exception:
available = False
# 打印模块来源与配置状态,帮助定位不可用原因
try:
mod_file = getattr(bae_mod, "__file__", "<unknown>")
except Exception:
mod_file = "<unknown>"
try:
if bae_mod:
from .baidu_ocr_config import get_config_status
cfg = get_config_status()
else:
cfg = {"sdk_available": False, "reason": "baidu_ocr_extractor模块导入失败"}
except Exception as e:
cfg = {"error": str(e)}
try:
sdk_flag = getattr(bae_mod, 'BAIDU_OCR_AVAILABLE', None)
except Exception:
sdk_flag = None
logger.info(f"[OCR状态] BaiduOCRExtractor 可用={available} | SDK可用={sdk_flag} | 模块={mod_file} | 配置={cfg}")
# 汇总到每个原文件
file_awb_map: Dict[str, set] = {}
file_page_count: Dict[str, int] = {}
for pi in split_pages:
awbs = _ocr_awb_for_single_page(extractor, pi.page_pdf, log_text=log_ocr_text) if pi.page_pdf else []
# 日期提取:针对单页PDF进行文本识别,必要时OCR回退
arrived = _extract_date_arrived_for_single_page(pi.page_pdf, extractor) if pi.page_pdf else None
# 去重
unique_awbs = []
seen = set()
for a in awbs:
if a not in seen:
seen.add(a)
unique_awbs.append(a)
pi.awbs = unique_awbs
all_page_indices.append(pi)
# 页级日志:记录识别到的AWB与到达日期(若无则用'-'占位)
logger.info(
f"[页扫描] 文件={pi.file} 页={pi.page} AWB={', '.join(unique_awbs) if unique_awbs else '-'} 日期={arrived or '-'}"
)
# 汇总
file_awb_map.setdefault(pi.file, set()).update(unique_awbs)
file_page_count[pi.file] = file_page_count.get(pi.file, 0) + 1
# 构建文件级汇总
for f in sorted(file_page_count.keys()):
s = FileSummary(file=f, awbs=sorted(file_awb_map.get(f, set())), total_awbs=len(file_awb_map.get(f, set())), total_pages=file_page_count[f])
all_summaries.append(s)
logger.info(f"索引完成: {f} | 页数={s.total_pages} | AWB={s.total_awbs}")
else:
for pdf_path in sorted(pdf_files):
try:
page_indices, summary = index_pdf_file(pdf_path, save_pages=save_pages, pages_dir=page_output_dir, use_baidu_ocr=True)
all_page_indices.extend(page_indices)
all_summaries.append(summary)
logger.info(f"索引完成: {os.path.basename(pdf_path)} | 页数={summary.total_pages} | AWB={summary.total_awbs}")
except Exception as e:
logger.info(f"索引失败: {pdf_path}: {e}")
write_index_csv(all_page_indices, output_index_csv)
write_summary_csv(all_summaries, output_summary_csv)
return {
"dir_path": dir_path,
"index_file": output_index_csv,
"summary_file": output_summary_csv,
"total_pdfs": len(pdf_files),
"total_pages": sum(s.total_pages for s in all_summaries),
"total_awbs": sum(s.total_awbs for s in all_summaries),
}
def write_index_csv(page_indices: List[PageIndex], output_path: str) -> None:
"""
写入页级索引 CSV
- 输入: PageIndex 列表与输出路径
- 输出: 无(在磁盘生成 CSV)
"""
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# 基于(原始文件名, 页码)去重,只保留最新记录
dedup_map: Dict[tuple, PageIndex] = {}
for item in page_indices:
key = (item.file, item.page)
# 若重复,优先含AWB的记录,否则覆盖为最新
if key in dedup_map:
if item.awbs and not dedup_map[key].awbs:
dedup_map[key] = item
else:
dedup_map[key] = item
else:
dedup_map[key] = item
with open(output_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
# 索引文件四列:原始文件名、页码、分拆后索引ID、AWB号码(多个)
writer.writerow(["file", "page", "index_id", "awbs"])
for item in sorted(dedup_map.values(), key=lambda x: (x.file, x.page)):
writer.writerow([item.file, item.page, item.index_id, ", ".join(item.awbs)])
logger.info(f"已生成索引文件: {output_path}")
def write_summary_csv(summaries: List[FileSummary], output_path: str) -> None:
"""
写入文件级汇总 CSV
- 输入: FileSummary 列表与输出路径
- 输出: 无(在磁盘生成 CSV)
"""
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["file", "awb_count", "pages", "awbs"])
for s in summaries:
writer.writerow([s.file, s.total_awbs, s.total_pages, ", ".join(s.awbs)])
logger.info(f"已生成汇总文件: {output_path}")
def _build_context7_from_args(args: argparse.Namespace) -> Dict[str, Any]:
"""
从命令行参数构建 context7 字典
- 输入: argparse 参数命名空间
- 输出: context7(用于统一传参)
"""
ctx = {
"dir_path": args.dir,
"output_index_csv": args.index,
"output_summary_csv": args.summary,
"save_pages": args.save_pages,
"page_output_dir": args.pages_dir,
}
# 是否输出 OCR 全文日志
if hasattr(args, "log_ocr_text"):
ctx["log_ocr_text"] = bool(args.log_ocr_text)
return ctx
def main(argv: List[str] | None = None) -> int:
"""
程序入口(CLI)
- 支持指定目录与输出文件名;默认扫描当前仓库下 POD 目录。
"""
parser = argparse.ArgumentParser(description="POD PDF 索引器:先拆分为单页PDF,再用百度OCR识别AWB并生成索引")
parser.add_argument("--dir", dest="dir", default=os.path.dirname(__file__), help="POD 目录路径,默认为当前文件所在目录")
parser.add_argument("--index", dest="index", default=None, help="页级索引 CSV 输出路径,默认 POD/pod_index.csv")
parser.add_argument("--summary", dest="summary", default=None, help="文件汇总 CSV 输出路径,默认 POD/pod_awb_summary.csv")
parser.add_argument("--save-pages", dest="save_pages", action="store_true", help="开启按页另存PDF")
parser.add_argument("--pages-dir", dest="pages_dir", default=None, help="单页PDF输出目录,默认 POD/pages")
parser.add_argument("--no-split-first", dest="no_split_first", action="store_true", help="关闭'先拆分后OCR'流程,改为原先边索引边拆分")
parser.add_argument("--log-ocr-text", dest="log_ocr_text", action="store_true", help="将每页OCR识别的全文内容输出到日志,便于调试")
args = parser.parse_args(argv)
ctx = _build_context7_from_args(args)
if getattr(args, "no_split_first", False):
ctx["pipeline_split_first"] = False
result = index_pod_directory(ctx)
logger.info(f"索引完成:{result}")
return 0
if __name__ == "__main__":
sys.exit(main())
<?xml version="1.0" encoding="utf-8"?>
<odoo>
<data>
<record id="res_config_settings_view_form_ocr_inherit" model="ir.ui.view">
<field name="name">res.config.settings.view.form.inherit.ocr</field>
<field name="model">res.config.settings</field>
<field name="inherit_id" ref="base_setup.res_config_settings_view_form"/>
<field name="arch" type="xml">
<xpath expr="//div[hasclass('app_settings_block')]/div" position="before">
<!-- OCR 配置板块 -->
<div id="ocr_baidu_config">
<h2>百度 OCR 接口配置</h2>
<div class="row mt16 o_settings_container">
<!-- 开启/关闭开关 -->
<div class="col-12 col-lg-6 o_setting_box">
<div class="o_setting_left_pane">
<field name="ocr_enabled"/>
</div>
<div class="o_setting_right_pane">
<label for="ocr_enabled"/>
<div class="text-muted">
开启后,系统将自动识别附件 PDF 中的提单号
</div>
</div>
</div>
<!-- 百度参数设置 -->
<div class="col-12 col-lg-6 o_setting_box" attrs="{'invisible': [('ocr_enabled', '=', False)]}">
<div class="o_setting_right_pane">
<div class="content-group">
<div class="row mt16">
<label for="baidu_ocr_app_id" string="App ID" class="col-lg-3 o_light_label"/>
<field name="baidu_ocr_app_id"/>
</div>
<div class="row">
<label for="baidu_ocr_api_key" string="API Key" class="col-lg-3 o_light_label"/>
<field name="baidu_ocr_api_key"/>
</div>
<div class="row">
<label for="baidu_ocr_secret_key" string="Secret Key" class="col-lg-3 o_light_label"/>
<field name="baidu_ocr_secret_key" password="True"/>
</div>
</div>
</div>
</div>
<!-- 超时与重试设置 -->
<div class="col-12 col-lg-6 o_setting_box" attrs="{'invisible': [('ocr_enabled', '=', False)]}">
<div class="o_setting_right_pane">
<div class="text-muted">
<label for="ocr_timeout" string="超时时间(秒)"/>
<field name="ocr_timeout" style="width: 50px;"/>
</div>
<div class="text-muted">
<label for="ocr_max_retries" string="最大重试次数"/>
<field name="ocr_max_retries" style="width: 50px;"/>
</div>
</div>
</div>
</div>
</div>
</xpath>
</field>
</record>
</data>
</odoo>
\ No newline at end of file
...@@ -105,7 +105,8 @@ class TT(models.Model): ...@@ -105,7 +105,8 @@ class TT(models.Model):
} }
request_url = tt_url + url request_url = tt_url + url
logging.info('request_url: %s' % request_url) logging.info('request_url: %s' % request_url)
logging.info('request_data: %s' % parameter) if 'clearance_file_feedback' not in request_url:
logging.info('request_data: %s' % parameter)
response = requests.post(request_url, headers=headers, data=parameter) response = requests.post(request_url, headers=headers, data=parameter)
logging.info('response: %s' % response.text) logging.info('response: %s' % response.text)
# response = {'code': 0} # response = {'code': 0}
......
...@@ -6,6 +6,7 @@ numpy ...@@ -6,6 +6,7 @@ numpy
Pillow Pillow
tesseract tesseract
pytesseract pytesseract
baidu-aip
# 系统依赖安装说明: # 系统依赖安装说明:
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论