提交 a0778f4e authored 作者: 贺阳's avatar 贺阳

1、同步货站Pod和同步尾程的优化

上级 7b39af6c
......@@ -426,9 +426,9 @@
</field>
</record>
<!-- 获取尾程快递POD -->
<record id="bl_get_delivery_pod_info_server_action" model="ir.actions.server">
<field name="name">Batch Get Last Mile POD Info</field>
<!-- 下载货站提货POD-->
<record id="bl_download_pod_server_action" model="ir.actions.server">
<field name="name">Batch Download PickUp POD</field>
<field name="model_id" ref="model_cc_bl"/>
<field name="binding_model_id" ref="model_cc_bl"/>
<field name="state">code</field>
......@@ -436,13 +436,13 @@
<field name="groups_id" eval="[(4, ref('ccs_base.group_clearance_of_customs_user'))]"/>
<field name="code">
if records:
action = records.action_batch_get_last_mile_pod_info()
action = records.action_batch_download_pod('货站提货POD')
</field>
</record>
<!-- 下载货站提货POD-->
<record id="bl_download_pod_server_action" model="ir.actions.server">
<field name="name">Batch Download PickUp POD</field>
<!-- 获取尾程快递POD -->
<record id="bl_get_delivery_pod_info_server_action" model="ir.actions.server">
<field name="name">Batch Get Last Mile POD Info</field>
<field name="model_id" ref="model_cc_bl"/>
<field name="binding_model_id" ref="model_cc_bl"/>
<field name="state">code</field>
......@@ -450,7 +450,7 @@
<field name="groups_id" eval="[(4, ref('ccs_base.group_clearance_of_customs_user'))]"/>
<field name="code">
if records:
action = records.action_batch_download_pod('货站提货POD')
action = records.action_batch_get_last_mile_pod_info()
</field>
</record>
......
......@@ -60,10 +60,16 @@ class BatchGetLastMilePodInfoWizard(models.TransientModel):
def _get_bill_numbers(self, bl_objs):
_logger.info(f"开始预览操作,提单数量: {len(bl_objs)}")
start_time = time.time()
logging.info(f"开始预览操作,开始时间{len(bl_objs)}")
# 调用接口获取提单pdf文件
pdf_file_arr = self._get_pdf_file_arr(bl_objs)
end_time = time.time()
logging.info(f"获取PDF文件耗时: {end_time - start_time} 秒")
# 处理PDF文件,匹配提单对象
processed_files = self._match_bl_by_file_name(pdf_file_arr, bl_objs)
file_end_time = time.time()
logging.info(f"处理PDF文件耗时: {file_end_time - end_time} 秒")
# 把没有匹配到文件的进行提示
error_bl = []
matched_bl_ids = [f['bl'].id for f in processed_files if f.get('bl')]
......@@ -209,8 +215,10 @@ class BatchGetLastMilePodInfoWizard(models.TransientModel):
json=request_data,
timeout=30
)
logging.info('response:%s' % response)
if response.status_code == 200:
result = response.json()
logging.info('result:%s' % result)
# 检查API响应结构
if not result:
raise ValidationError(_('API returned empty response'))
......@@ -250,36 +258,57 @@ class BatchGetLastMilePodInfoWizard(models.TransientModel):
:param processed_files: 处理后的文件数组
:param fix_name:
"""
clearance_model = self.env['cc.clearance.file']
valid_entries = []
bl_ids = set()
for file_info in processed_files:
if not file_info.get('bl'):
bl = file_info.get('bl')
if not bl:
_logger.warning("跳过没有提单信息的文件")
continue
bl = file_info['bl']
file_name = file_info.get('file_name', '')
file_data = file_info.get('file_data', '')
if not file_data:
continue
valid_entries.append((file_info, bl, file_name, file_data))
bl_ids.add(bl.id)
if not valid_entries:
return
# 如果有文件为空的就回写,否则就创建新的清关文件记录
clearance_file = self.env['cc.clearance.file'].search(
[('bl_id', '=', bl.id), ('file_name', '=', fix_name), ('file', '=', False)], limit=1)
existing_clearance = clearance_model.search(
[('bl_id', 'in', list(bl_ids)), ('file_name', '=', fix_name), ('file', '=', False)]
)
existing_by_bl = {rec.bl_id.id: rec for rec in existing_clearance}
create_vals_list = []
create_infos = []
for file_info, bl, file_name, file_data in valid_entries:
clearance_file = existing_by_bl.get(bl.id)
if clearance_file:
clearance_file.write({
'attachment_name': file_name,
'file': file_data
})
_logger.info(f"更新清关文件记录: 提单 {bl.bl_no}")
file_info['clearance_file'] = clearance_file
else:
# 创建新的清关文件记录
clearance_file = self.env['cc.clearance.file'].create({
create_vals_list.append({
'bl_id': bl.id,
'file_name': fix_name,
'attachment_name': file_name,
'file': file_data
})
create_infos.append(file_info)
if create_vals_list:
new_records = clearance_model.create(create_vals_list)
for clearance_file, file_info in zip(new_records, create_infos):
bl = file_info['bl']
_logger.info(f"创建新的清关文件记录: 提单 {bl.bl_no}")
file_info['clearance_file'] = clearance_file
file_info['clearance_file'] = clearance_file
def _merge_pdf_files(self, processed_files):
"""
......@@ -326,61 +355,32 @@ class BatchGetLastMilePodInfoWizard(models.TransientModel):
_logger.info(f"单个PDF文件直接保存: {pdf_filename}")
return
# 多个PDF文件需要合并
_logger.info(f"开始合并 {len(valid_files)} 个PDF文件")
# 使用临时文件方式合并,避免内存占用过大
temp_file_path = tempfile.mktemp(suffix='.pdf')
merged_pdf = fitz.open()
bl_numbers = []
# 遍历所有处理后的PDF文件,分批处理以减少内存占用
batch_size = 5 # 每批处理5个PDF
for batch_start in range(0, len(valid_files), batch_size):
batch_files = valid_files[batch_start:batch_start + batch_size]
_logger.info(f"处理第 {batch_start // batch_size + 1} 批,共 {len(batch_files)} 个PDF")
for file_info in batch_files:
bl = file_info['bl']
bl_no = bl.bl_no
file_data = file_info['file_data']
bl_numbers.append(bl_no)
source_pdf = None
try:
# 将base64数据转换为二进制
pdf_binary = base64.b64decode(file_data)
# 打开PDF文档
source_pdf = fitz.open(stream=pdf_binary, filetype="pdf")
# 将源PDF的所有页面插入到合并的PDF中
merged_pdf.insert_pdf(source_pdf)
_logger.info(f"已添加提单 {bl_no} 的PDF到合并文档({len(source_pdf)} 页)")
except Exception as e:
_logger.error(f"合并提单 {bl_no} 的PDF失败: {str(e)}")
continue
finally:
# 立即释放资源
if source_pdf:
source_pdf.close()
gc.collect() # 强制垃圾回收
# 每批处理完后,保存到临时文件并释放内存
if batch_start + batch_size < len(valid_files):
# 保存当前合并结果到临时文件
merged_pdf.save(temp_file_path, garbage=4, deflate=True, clean=True)
merged_pdf.close()
# 重新打开临时文件继续合并
merged_pdf = fitz.open(temp_file_path)
for file_info in valid_files:
bl = file_info['bl']
bl_no = bl.bl_no
file_data = file_info['file_data']
bl_numbers.append(bl_no)
source_pdf = None
try:
pdf_binary = base64.b64decode(file_data)
source_pdf = fitz.open(stream=pdf_binary, filetype="pdf")
merged_pdf.insert_pdf(source_pdf)
_logger.info(f"已添加提单 {bl_no} 的PDF到合并文档({len(source_pdf)} 页)")
except Exception as e:
_logger.error(f"合并提单 {bl_no} 的PDF失败: {str(e)}")
continue
finally:
if source_pdf:
source_pdf.close()
gc.collect()
# 如果有页面,保存合并后的PDF
if len(merged_pdf) > 0:
# 使用临时文件保存,减少内存占用
if not temp_file_path:
temp_file_path = tempfile.mktemp(suffix='.pdf')
merged_pdf.save(temp_file_path, garbage=4, deflate=True, clean=True)
merged_pdf.close()
......@@ -458,107 +458,36 @@ class BatchGetLastMilePodInfoWizard(models.TransientModel):
Sync last mile POD information
:param processed_files: 处理后的文件数组
"""
is_fail = [] # 同步失败
redis_conn = self.env['common.common'].sudo().get_redis()
if not redis_conn or redis_conn == 'no':
raise ValidationError('未连接redis,无法同步尾程POD,请联系管理员')
bl_ids = []
for file_info in processed_files:
if not file_info['bl']:
bl = file_info.get('bl')
if not bl:
continue
bl = file_info['bl']
if bl.bl_type == 'temu':
continue
# 查找清关文件并执行同步
clearance_file = file_info.get('clearance_file')
if clearance_file:
try:
clearance_file.action_sync() # 同步尾程交接POD
except Exception as e:
logging.info('_sync_last_mile_pod:%s' % e)
is_fail = True
break
_logger.info(f"Successfully synced POD for BL {bl.bl_no}")
if is_fail:
raise ValidationError('本次同步失败,请重试!')
def get_date_sync_match_node(self, processed_files):
"""
Sync matched node based on POD file, extract time from red boxes # 根据POD文件同步匹配节点
:param processed_files: 处理后的文件数组
"""
ship_packages, pod_node_id = self.get_detail_info(processed_files)
self._sync_match_node(ship_packages, pod_node_id)
def get_detail_info(self, processed_files):
"""
获取提单对应的节点以及时间
:param processed_files: 处理后的文件数组(应该已经包含valid_packages字段,只包含满足条件的小包)
:return: 提单对应的节点以及节点操作时间
"""
ship_packages = []
# 查找对应的清关节点(勾选了POD节点匹配的节点)
pod_node = self.env['cc.node'].search([
('is_pod_node', '=', True),
('node_type', '=', 'package')
], limit=1)
for file_info in processed_files:
if not file_info.get('bl'):
continue
bl = file_info['bl']
if not pod_node:
continue
# 只使用满足条件的小包(经过验证的valid_packages)
valid_packages = file_info.get('valid_packages', [])
if not valid_packages:
_logger.warning(f"提单 {bl.bl_no} 没有满足条件的小包,跳过节点推送")
continue
# 从valid_packages中提取小包ID(记录集对象或列表)
if hasattr(valid_packages, 'ids'):
# 如果是记录集对象,直接获取IDs
valid_package_ids = valid_packages.ids
elif isinstance(valid_packages, list):
# 如果是列表,提取每个对象的ID
valid_package_ids = [p.id for p in valid_packages if hasattr(p, 'id')]
else:
_logger.warning(f"提单 {bl.bl_no} valid_packages格式不正确: {type(valid_packages)}")
valid_package_ids = []
_logger.info(f"提单 {bl.bl_no} 满足条件的小包ID: {valid_package_ids} (共 {len(valid_package_ids)} 个)")
if not valid_package_ids:
_logger.warning(f"提单 {bl.bl_no} 满足条件的小包ID为空,跳过节点推送")
continue
# 从PDF文件提取红色框的时间
file_data = file_info.get('file_data')
if not file_data:
logging.info(f"提单 {bl.bl_no} 没有文件数据")
if not clearance_file:
continue
bl_ids.append(bl.id)
ship_packages.append({
'bl_id': bl.id,
'id': valid_package_ids, # 只包含满足条件的小包ID
'tally_time': str(file_info.get('tally_time'))
})
return ship_packages, pod_node.id
if not bl_ids:
return
def _sync_match_node(self, ship_packages, pod_node_id):
"""
同步匹配节点
:param ship_packages: 提单对应的小包以及节点信息
:param pod_node_id: 尾程POD节点匹配的节点ID
"""
# 若该提单里已有对应的小包已有节点推送日志,则不再重新推送;
_logger.info(f"同步匹配节点,提单: {ship_packages}, 节点: {pod_node_id}")
if ship_packages:
bl_objs = self.env['cc.bl'].sudo().search(
[('id', 'in', [ship_package.get('bl_id') for ship_package in ship_packages])])
redis_conn = self.env['common.common'].sudo().get_redis()
if redis_conn and redis_conn != 'no' and pod_node_id:
redis_conn.lpush('mail_push_package_list', json.dumps(
{'ids': bl_objs.ids, 'ship_packages': str(ship_packages), 'action_type': 'push_match_node',
'user_login': self.env.user.login,
'pod_node_id': pod_node_id}))
payload = {
'ids': bl_ids,
'action_type': 'sync_last_mile_pod',
'user_login': self.env.user.login,
'file_type': '尾程交接POD(待大包数量和箱号)'
}
try:
redis_conn.lpush('mail_push_package_list', json.dumps(payload))
except Exception as e:
logging.error('sync_last_mile_pod redis error:%s' % e)
raise ValidationError('推送尾程POD同步任务到redis失败,请重试或联系管理员')
def _cleanup_temp_attachments(self, bl_objs=None):
"""
......
......@@ -349,7 +349,6 @@ class BatchGetPodInfoWizard(models.TransientModel):
if action_type == '获取货站提货POD信息':
if self.sync_last_mile_pod and successful_processed_files:
self._sync_last_mile_pod(successful_processed_files)
# 同步推送匹配节点
if self.sync_match_node and successful_processed_files:
# 且需先对比小包当前节点的操作时间是否小于提取时间(同时区对比)若大于则不能推送,
......@@ -726,25 +725,34 @@ class BatchGetPodInfoWizard(models.TransientModel):
Sync pickup POD information # 同步货站提货POD信息
:param processed_files: 处理后的文件数组
"""
# return False#测试 先不同步
# 同步货站提货POD信息
is_fail = [] # 同步失败
redis_conn = self.env['common.common'].sudo().get_redis()
if not redis_conn or redis_conn == 'no':
raise ValidationError('未连接redis,无法同步货站提货POD,请联系管理员')
bl_ids = []
for file_info in processed_files:
if not file_info['bl']:
bl = file_info.get('bl')
if not bl:
continue
bl = file_info['bl']
# 查找清关文件并执行同步
clearance_file = file_info.get('clearance_file')
if clearance_file:
try:
clearance_file.action_sync() # 同步货站提货POD
except Exception as e:
logging.info('_sync_last_mile_pod:%s' % e)
is_fail = True
break
_logger.info(f"Successfully synced POD for BL {bl.bl_no}")
if is_fail:
raise ValidationError('本次同步失败,请重试!')
if not clearance_file:
continue
bl_ids.append(bl.id)
if not bl_ids:
return
payload = {
'ids': bl_ids,
'action_type': 'sync_last_mile_pod',
'user_login': self.env.user.login,
'file_type': '货站提货POD'
}
try:
redis_conn.lpush('mail_push_package_list', json.dumps(payload))
except Exception as e:
logging.info('sync_last_mile_pod redis error:%s' % e)
raise ValidationError('推送货站提货POD同步任务到redis失败,请重试或联系管理员')
def _check_target_texts_exist(self, pdf_binary, bl_no):
"""
......
......@@ -31,8 +31,11 @@ class Order_dispose(object):
try:
data = json.loads(data)
logging.info('mail_push_data: %s', data)
action_type = data.get('action_type')
if action_type == 'sync_last_mile_pod':
self._sync_last_mile_pod_from_queue(data)
return res_data
ship_packages = eval(data['ship_packages']) if data.get('ship_packages') else [] # 小包
action_type = data.get('action_type') # 类型
utc_time = data.get('utc_time')
bl_obj = self.odoo_db.env['cc.bl']
if action_type and not utc_time:
......@@ -52,6 +55,32 @@ class Order_dispose(object):
logging.error('mail_auto_push error:%s' % str(ex))
return res_data
def _sync_last_mile_pod_from_queue(self, data):
bl_ids = data.get('ids') or []
if not bl_ids:
return
try:
bl_model = self.odoo_db.env['cc.bl']
clearance_model = self.odoo_db.env['cc.clearance.file']
bl_records = bl_model.browse(bl_ids)
non_temu_ids = [bl.id for bl in bl_records if getattr(bl, 'bl_type', False) != 'temu']
if not non_temu_ids:
return
clearance_ids = clearance_model.search([
('bl_id', 'in', non_temu_ids),
('file_name', '=', data.get('file_type')),
])
if not clearance_ids:
return
clearance_records = clearance_model.browse(clearance_ids)
for clearance_file in clearance_records:
try:
clearance_file.action_sync()
except Exception as ex:
logging.error('sync_last_mile_pod action_sync error:%s' % str(ex))
except Exception as ex:
logging.error('sync_last_mile_pod_from_queue error:%s' % str(ex))
try:
pool = redis.ConnectionPool(**config.redis_options)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论