提交 55556065 authored 作者: 刘擎阳's avatar 刘擎阳

Merge branch 'develop' of https://e.coding.net/yizuo/hh_ccs/hh_ccs into develop

...@@ -16,7 +16,7 @@ class CommonCommon(models.Model): ...@@ -16,7 +16,7 @@ class CommonCommon(models.Model):
_name = 'common.common' _name = 'common.common'
_description = u'公用基础类' _description = u'公用基础类'
def get_local_time(self, local_time=None): def get_local_time(self, local_time=None, user_obj=False):
"""获取Odoo时区的时间 """获取Odoo时区的时间
Args: Args:
local_time: 本地时间,如果不提供则使用当前时间 local_time: 本地时间,如果不提供则使用当前时间
...@@ -27,7 +27,9 @@ class CommonCommon(models.Model): ...@@ -27,7 +27,9 @@ class CommonCommon(models.Model):
local_time = datetime.datetime.now() local_time = datetime.datetime.now()
try: try:
# 获取Odoo配置的时区 # 获取Odoo配置的时区
user_tz = self.env.user.tz or 'UTC' if not user_obj:
user_obj = self.env.user
user_tz = user_obj.tz or 'UTC'
timezone_offset = self.env['common.common'].sudo().get_time_zone(user_tz) timezone_offset = self.env['common.common'].sudo().get_time_zone(user_tz)
local_time = local_time + datetime.timedelta(hours=int(timezone_offset)) local_time = local_time + datetime.timedelta(hours=int(timezone_offset))
return local_time.strftime('%Y-%m-%d %H:%M:%S'), timezone_offset return local_time.strftime('%Y-%m-%d %H:%M:%S'), timezone_offset
...@@ -36,22 +38,7 @@ class CommonCommon(models.Model): ...@@ -36,22 +38,7 @@ class CommonCommon(models.Model):
logging.warning(f"获取Odoo时区失败,使用UTC时间: {str(e)}") logging.warning(f"获取Odoo时区失败,使用UTC时间: {str(e)}")
return self.get_utc_time(), '+0' return self.get_utc_time(), '+0'
def get_timezone_offset(self): def get_local_rfc3339_time(self, local_time=None, user_obj=False):
"""
Get UTC offset from Odoo's timezone configuration
Returns format like: UTC+8, UTC-5, etc.
"""
try:
tz = self.env.user.tz or 'UTC'
if self.env.user.name == 'OdooBot':
tz = 'Asia/Shanghai'
user_tz = int(self.init_timezone_data(tz))
return user_tz
except Exception as e:
_logger.error("Timezone offset calculation error: %s", str(e))
return "UTC+0" # Default t
def get_local_rfc3339_time(self, local_time=None):
"""获取Odoo本地时区的RFC3339格式时间 """获取Odoo本地时区的RFC3339格式时间
Args: Args:
local_time: 本地时间,如果不提供则使用当前时间 local_time: 本地时间,如果不提供则使用当前时间
...@@ -61,7 +48,11 @@ class CommonCommon(models.Model): ...@@ -61,7 +48,11 @@ class CommonCommon(models.Model):
if not local_time: if not local_time:
local_time = datetime.datetime.now() local_time = datetime.datetime.now()
# 获取Odoo配置的时区 # 获取Odoo配置的时区
user_tz = self.env.user.tz or 'UTC' if not user_obj:
user_obj = self.env.user
user_tz = user_obj.tz or 'UTC'
timezone_offset = self.env['common.common'].sudo().get_time_zone(user_tz)
local_time = local_time + datetime.timedelta(hours=int(timezone_offset))
local_tz = pytz.timezone(user_tz) local_tz = pytz.timezone(user_tz)
# 确保时间是本地时区 # 确保时间是本地时区
if local_time.tzinfo is None: if local_time.tzinfo is None:
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# © <2016> <ToproERP hy> # © <2016> <ToproERP hy>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from datetime import datetime
from odoo import models, fields, api, _
from odoo.exceptions import ValidationError, Warning
import logging
import json
import xlrd
import base64 import base64
import pdfplumber import json
from io import BytesIO import logging
from datetime import datetime, timedelta
import re import re
from datetime import datetime, timedelta
from io import BytesIO
import pdfplumber
import xlrd
from odoo import models
from odoo.exceptions import ValidationError
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
import html import html
...@@ -106,8 +107,9 @@ class OrderStateChangeRule(models.Model): ...@@ -106,8 +107,9 @@ class OrderStateChangeRule(models.Model):
return data_arr return data_arr
def upload_cds_attachment(self, bl_obj, name, data): def upload_cds_attachment(self, bl_obj, name, data):
file_obj = self.env['cc.clearance.file'].sudo().search([('file_name', '=', '海关CDS申报单(import和授权方式检查拉齐等)'), file_obj = self.env['cc.clearance.file'].sudo().search(
('bl_id', '=', bl_obj.id)], limit=1) [('file_name', '=', '海关CDS申报单(import和授权方式检查拉齐等)'),
('bl_id', '=', bl_obj.id)], limit=1)
file_obj.file = base64.encodebytes(data) file_obj.file = base64.encodebytes(data)
file_obj.attachment_name = name file_obj.attachment_name = name
file_obj.is_upload = False file_obj.is_upload = False
...@@ -116,7 +118,7 @@ class OrderStateChangeRule(models.Model): ...@@ -116,7 +118,7 @@ class OrderStateChangeRule(models.Model):
def upload_pod_attachment(self, bl_obj, name, data): def upload_pod_attachment(self, bl_obj, name, data):
"""尾程交接POD(待大包数量和箱号) 文件上传与同步""" """尾程交接POD(待大包数量和箱号) 文件上传与同步"""
file_objs = self.env['cc.clearance.file'].sudo().search([('file_name', '=', '尾程交接POD(待大包数量和箱号)'), file_objs = self.env['cc.clearance.file'].sudo().search([('file_name', '=', '尾程交接POD(待大包数量和箱号)'),
('bl_id', 'in', bl_obj.ids)]) ('bl_id', 'in', bl_obj.ids)])
# 最大重试次数 # 最大重试次数
max_retries = 2 max_retries = 2
...@@ -165,7 +167,8 @@ class OrderStateChangeRule(models.Model): ...@@ -165,7 +167,8 @@ class OrderStateChangeRule(models.Model):
timezone_offset = self.env['common.common'].sudo().get_time_zone(tz) timezone_offset = self.env['common.common'].sudo().get_time_zone(tz)
# print(timezone_offset) # print(timezone_offset)
utc_time = local_time - timedelta(hours=int(timezone_offset)) utc_time = local_time - timedelta(hours=int(timezone_offset))
sql = "select id from cc_bl where UPPER(REPLACE(REPLACE(REPLACE(bl_no, ' ', ''), '-', ''), '/', '')) = '{0}' order by create_date desc limit 1".format(order_no.replace(' ', '').replace('-', '').replace('/', '')) sql = "select id from cc_bl where UPPER(REPLACE(REPLACE(REPLACE(bl_no, ' ', ''), '-', ''), '/', '')) = '{0}' order by create_date desc limit 1".format(
order_no.replace(' ', '').replace('-', '').replace('/', ''))
self._cr.execute(sql) self._cr.execute(sql)
result = self._cr.fetchall() result = self._cr.fetchall()
bl_obj = self.env['cc.bl'].sudo().search([('id', '=', result[0][0])]) if result else False bl_obj = self.env['cc.bl'].sudo().search([('id', '=', result[0][0])]) if result else False
...@@ -177,7 +180,9 @@ class OrderStateChangeRule(models.Model): ...@@ -177,7 +180,9 @@ class OrderStateChangeRule(models.Model):
if redis_conn == 'no': if redis_conn == 'no':
raise ValidationError('未连接redis') raise ValidationError('未连接redis')
else: else:
redis_conn.lpush('mail_push_package_list', json.dumps({'id': bl_obj.id, 'utc_time': utc_time.strftime("%Y-%m-%d %H:%M:%S")})) redis_conn.lpush('mail_push_package_list', json.dumps(
{'id': bl_obj.id, 'user_login': users_obj.login,
'utc_time': utc_time.strftime("%Y-%m-%d %H:%M:%S")}))
except Exception as err: except Exception as err:
logging.error('fetch_mail_dlv_attachment--error:%s' % str(err)) logging.error('fetch_mail_dlv_attachment--error:%s' % str(err))
...@@ -254,7 +259,8 @@ class OrderStateChangeRule(models.Model): ...@@ -254,7 +259,8 @@ class OrderStateChangeRule(models.Model):
# 调整时区 # 调整时区
utc_time = local_time - timedelta(hours=timezone_offset) utc_time = local_time - timedelta(hours=timezone_offset)
sql = "select id from cc_bl where UPPER(REPLACE(REPLACE(REPLACE(bl_no, ' ', ''), '-', ''), '/', '')) = '{0}' " \ sql = "select id from cc_bl where UPPER(REPLACE(REPLACE(REPLACE(bl_no, ' ', ''), '-', ''), '/', '')) = '{0}' " \
"and transport_tool_name='{1}' order by create_date desc limit 1".format(order_no.replace(' ', '').replace('-', '').replace('/', ''), voyage_name) "and transport_tool_name='{1}' order by create_date desc limit 1".format(
order_no.replace(' ', '').replace('-', '').replace('/', ''), voyage_name)
self._cr.execute(sql) self._cr.execute(sql)
result = self._cr.fetchall() result = self._cr.fetchall()
print(result) print(result)
...@@ -265,7 +271,8 @@ class OrderStateChangeRule(models.Model): ...@@ -265,7 +271,8 @@ class OrderStateChangeRule(models.Model):
if redis_conn == 'no': if redis_conn == 'no':
raise ValidationError('未连接redis') raise ValidationError('未连接redis')
else: else:
redis_conn.lpush('mail_push_package_list', json.dumps({'id': bl_obj.id, 'utc_time': utc_time.strftime("%Y-%m-%d %H:%M:%S")})) redis_conn.lpush('mail_push_package_list', json.dumps(
{'id': bl_obj.id, 'utc_time': utc_time.strftime("%Y-%m-%d %H:%M:%S")}))
except Exception as err: except Exception as err:
logging.error('fetch_mail_dlv--error:%s' % str(err)) logging.error('fetch_mail_dlv--error:%s' % str(err))
......
...@@ -3,7 +3,7 @@ import asyncio ...@@ -3,7 +3,7 @@ import asyncio
import logging import logging
import ssl import ssl
from datetime import timedelta, datetime from datetime import timedelta, datetime
import json
import aiohttp import aiohttp
import certifi import certifi
import pytz import pytz
...@@ -180,10 +180,10 @@ class CcShipPackage(models.Model): ...@@ -180,10 +180,10 @@ class CcShipPackage(models.Model):
record.process_time.strftime( record.process_time.strftime(
'%Y-%m-%d %H:%M:%S')) '%Y-%m-%d %H:%M:%S'))
def get_callback_track_data(self): def get_callback_track_data(self, user_obj=False):
"""小包上传数据.""" """小包上传数据."""
# 获取该提单下的所有同步状态为未同步的小包 # 获取该提单下的所有同步状态为未同步的小包
operate_time, timezone = self.env['common.common'].get_local_time(self.process_time) operate_time, timezone = self.env['common.common'].get_local_time(self.process_time, user_obj)
push_data = { push_data = {
"provider_order_id": self.logistic_order_no, "provider_order_id": self.logistic_order_no,
"track_list": [ "track_list": [
...@@ -202,9 +202,9 @@ class CcShipPackage(models.Model): ...@@ -202,9 +202,9 @@ class CcShipPackage(models.Model):
# logging.info('小包轨迹 push_data:%s' % push_data) # logging.info('小包轨迹 push_data:%s' % push_data)
return push_data return push_data
def callback_track(self, is_push=True): def callback_track(self, is_push=True, user_obj=False):
if not self.is_sync and self.state and self.state.tk_code: if not self.is_sync and self.state and self.state.tk_code:
data = self.get_callback_track_data() data = self.get_callback_track_data(user_obj)
if is_push: if is_push:
tt_api_obj = self.env["ao.tt.api"].sudo() tt_api_obj = self.env["ao.tt.api"].sudo()
response = tt_api_obj.callback_track(data) response = tt_api_obj.callback_track(data)
...@@ -363,7 +363,7 @@ class CcBl(models.Model): ...@@ -363,7 +363,7 @@ class CcBl(models.Model):
self.ship_package_ids) and self.unsync_package_count <= 0 and self.customs_clearance_status.is_done and self.is_bl_sync: self.ship_package_ids) and self.unsync_package_count <= 0 and self.customs_clearance_status.is_done and self.is_bl_sync:
self.done_func() self.done_func()
def change_customs_state_by_ship_package(self, package_state_obj): def change_customs_state_by_ship_package(self, package_state_obj, user_obj=False):
""" """
根据小包的状态修改提单关务状态以及生成同步日志 根据小包的状态修改提单关务状态以及生成同步日志
:param package_state_obj:小包更新后的状态 :param package_state_obj:小包更新后的状态
...@@ -393,7 +393,7 @@ class CcBl(models.Model): ...@@ -393,7 +393,7 @@ class CcBl(models.Model):
bl.is_bl_sync = before_node.node_is_sync() bl.is_bl_sync = before_node.node_is_sync()
self._cr.commit() self._cr.commit()
# 调用同步关务提单状态 # 调用同步关务提单状态
is_ok = bl.callback_track_bl() is_ok = bl.callback_track_bl(user_obj)
if not is_ok: if not is_ok:
break break
if is_ok: if is_ok:
...@@ -402,7 +402,7 @@ class CcBl(models.Model): ...@@ -402,7 +402,7 @@ class CcBl(models.Model):
bl.is_bl_sync = state_obj.node_is_sync() bl.is_bl_sync = state_obj.node_is_sync()
self._cr.commit() self._cr.commit()
# 调用同步提单状态 # 调用同步提单状态
bl.callback_track_bl() bl.callback_track_bl(user_obj)
# 如果提单有小包变成了清关开始,提单状态变为清关中;如果提单所有小包的清关节点变成"是完成节点",则该提单状态变成已完成 # 如果提单有小包变成了清关开始,提单状态变为清关中;如果提单所有小包的清关节点变成"是完成节点",则该提单状态变成已完成
bl.change_state_by_ship_package() bl.change_state_by_ship_package()
...@@ -445,13 +445,17 @@ class CcBl(models.Model): ...@@ -445,13 +445,17 @@ class CcBl(models.Model):
# =============同步提单状态================================== # =============同步提单状态==================================
# 定义一个方法, 获取提单,并回传提单状态 # 定义一个方法, 获取提单,并回传提单状态
def callback_track_bl(self):
def action_callback_track_bl(self):
self.callback_track_bl()
def callback_track_bl(self, user_obj=False):
is_ok = True is_ok = True
for item in self: for item in self:
is_ok = item.bl_callback_func(item.ids) is_ok = item.bl_callback_func(item.ids, user_obj)
return is_ok return is_ok
def bl_callback_func(self, bl_ids): def bl_callback_func(self, bl_ids, user_obj=False):
""" """
同步提单状态 同步提单状态
""" """
...@@ -467,7 +471,7 @@ class CcBl(models.Model): ...@@ -467,7 +471,7 @@ class CcBl(models.Model):
tasks = [] tasks = []
for index, bl in enumerate(bls): for index, bl in enumerate(bls):
if not bl.is_bl_sync and bl.customs_clearance_status and bl.customs_clearance_status.tk_code: if not bl.is_bl_sync and bl.customs_clearance_status and bl.customs_clearance_status.tk_code:
data = bl.get_bl_callback_track_data() data = bl.get_bl_callback_track_data(user_obj)
tasks.append(tt_api_obj.async_bl_callback_track(session, data, bl.id)) tasks.append(tt_api_obj.async_bl_callback_track(session, data, bl.id))
responses = await asyncio.gather(*tasks) responses = await asyncio.gather(*tasks)
return responses return responses
...@@ -503,40 +507,45 @@ class CcBl(models.Model): ...@@ -503,40 +507,45 @@ class CcBl(models.Model):
bl_order.bl_no or '', '', data, 0, request_id, source='推出') bl_order.bl_no or '', '', data, 0, request_id, source='推出')
return is_ok return is_ok
def get_bl_callback_track_data(self): def get_bl_callback_track_data(self, user_obj=False):
""" """
获取提单回传数据 获取提单回传数据
""" """
return { push_data = {
"master_waybill_no": self.bl_no or '', # 主提运单号 "master_waybill_no": self.bl_no or '', # 主提运单号
"customs_waybill_id": self.customs_bl_no or '', # 关务提单号取海关装货单号 "customs_waybill_id": self.customs_bl_no or '', # 关务提单号取海关装货单号
"track_detail": "track_detail":
{ {
"operate_time": self.env['common.common'].get_local_rfc3339_time(self.process_time), "operate_time": self.env['common.common'].get_local_rfc3339_time(self.process_time, user_obj),
# "2025-03-19T09:28:00+00:00", # "2025-03-19T09:28:00+00:00",
# get_utc_rfc339_time(self.process_time), # "2025-03-19T09:28:00+00:00", "#get_utc_time(self.process_time), # 事件发⽣时间,RFC3339格式(实操时间) # get_utc_rfc339_time(self.process_time), # "2025-03-19T09:28:00+00:00", "#get_utc_time(self.process_time), # 事件发⽣时间,RFC3339格式(实操时间)
"waybill_status_code": self.customs_clearance_status.tk_code, # 提单关务状态编码 "waybill_status_code": self.customs_clearance_status.tk_code, # 提单关务状态编码
} }
} }
logging.info('关务提单 push_data:%s' % push_data)
return push_data
# =============同步小包状态================================== # =============同步小包状态==================================
# 定义一个方法, 获取提单下的所有未同步的小包,并回传小包状态 # 定义一个方法, 获取提单下的所有未同步的小包,并回传小包状态
def callback_track(self): def action_callback_track(self):
self.callback_track()
def callback_track(self, user_obj=False):
is_ok = True is_ok = True
for item in self: for item in self:
ship_packages = self.env['cc.ship.package'].search([('bl_id', '=', item.id), ('is_sync', '=', False)]) ship_packages = self.env['cc.ship.package'].search([('bl_id', '=', item.id), ('is_sync', '=', False)])
is_ok = item.package_callback_func(ship_packages.ids) is_ok = item.package_callback_func(ship_packages.ids, user_obj=user_obj)
# 根据小包状态更新提单关务状态 # 根据小包状态更新提单关务状态
try: try:
if is_ok and item.ship_package_ids: if is_ok and item.ship_package_ids:
item.change_customs_state_by_ship_package(item.ship_package_ids[0].state) item.change_customs_state_by_ship_package(item.ship_package_ids[0].state, user_obj=user_obj)
except Exception as e: except Exception as e:
logging.info('change_customs_state_by_ship_package error:%s' % e) logging.info('change_customs_state_by_ship_package error:%s' % e)
return is_ok return is_ok
def package_callback_func(self, ship_package_ids): def package_callback_func(self, ship_package_ids, user_obj=False):
""" """
同步小包状态 同步小包状态
""" """
...@@ -552,7 +561,7 @@ class CcBl(models.Model): ...@@ -552,7 +561,7 @@ class CcBl(models.Model):
tasks = [] tasks = []
for index, package in enumerate(ship_packages): for index, package in enumerate(ship_packages):
if not package.is_sync and package.state and package.state.tk_code: if not package.is_sync and package.state and package.state.tk_code:
data = package.get_callback_track_data() data = package.get_callback_track_data(user_obj)
tasks.append(tt_api_obj.async_callback_track(session, data, package.id)) tasks.append(tt_api_obj.async_callback_track(session, data, package.id))
responses = await asyncio.gather(*tasks) responses = await asyncio.gather(*tasks)
return responses return responses
...@@ -794,22 +803,23 @@ class CcBl(models.Model): ...@@ -794,22 +803,23 @@ class CcBl(models.Model):
lambda r: r.bl_no.replace('-', '').replace(' ', '').lower() == processed_bl_no) # 提单 lambda r: r.bl_no.replace('-', '').replace(' ', '').lower() == processed_bl_no) # 提单
return bl_obj return bl_obj
def try_callback_track(self, max_retries=3, ship_package_ids=[]): def try_callback_track(self, max_retries=3, ship_package_ids=[], user_obj=False):
""" 封装的重试逻辑 """ """ 封装的重试逻辑 """
for i in range(max_retries): for i in range(max_retries):
if not ship_package_ids: if not ship_package_ids:
is_ok = self.callback_track() is_ok = self.callback_track(user_obj=user_obj)
else: else:
is_ok = self.package_callback_func(ship_package_ids) is_ok = self.package_callback_func(ship_package_ids, user_obj=user_obj)
if is_ok: if is_ok:
# 根据小包状态更新提单关务状态 # 根据小包状态更新提单关务状态
if self.ship_package_ids: if self.ship_package_ids:
self.change_customs_state_by_ship_package(self.ship_package_ids[0].state) self.change_customs_state_by_ship_package(self.ship_package_ids[0].state, user_obj=user_obj)
return is_ok return is_ok
logging.warning(f"Attempt {i + 1}/{max_retries} failed. Retrying...") logging.warning(f"Attempt {i + 1}/{max_retries} failed. Retrying...")
return False return False
def mail_auto_push(self, mail_time=False, ship_packages=[], action_type='tally'): def mail_auto_push(self, mail_time=False, ship_packages=[], action_type='tally', mail_db_user='邮件接收',
pda_db_user='pda'):
self = self.with_context(dict(self._context, is_mail=True)) self = self.with_context(dict(self._context, is_mail=True))
for item in self: for item in self:
try: try:
...@@ -818,15 +828,17 @@ class CcBl(models.Model): ...@@ -818,15 +828,17 @@ class CcBl(models.Model):
before_min = self.env['ir.config_parameter'].sudo().get_param('before_min') or 10 before_min = self.env['ir.config_parameter'].sudo().get_param('before_min') or 10
before_utc_time = utc_time - timedelta(minutes=int(before_min)) before_utc_time = utc_time - timedelta(minutes=int(before_min))
item.push_clear_customs_start(before_utc_time) item.push_clear_customs_start(before_utc_time)
user_obj = self.env['res.users'].search([('login', '=', mail_db_user)], limit=1)
# 尝试调用 callback_track # 尝试调用 callback_track
if self.try_callback_track(): if self.try_callback_track(user_obj=user_obj):
item.push_clear_customs_end(utc_time) item.push_clear_customs_end(utc_time)
# 再次尝试调用 callback_track # 再次尝试调用 callback_track
if not self.try_callback_track(): if not self.try_callback_track(user_obj=user_obj):
logging.error(f"Failed to push item after {3} attempts.") logging.error(f"Failed to push item after {3} attempts.")
else: else:
logging.error(f"Failed to start process for item after {3} attempts.") logging.error(f"Failed to start process for item after {3} attempts.")
elif ship_packages: elif ship_packages:
user_obj = self.env['res.users'].search([('login', '=', pda_db_user)], limit=1)
ship_package_ids = [ship_package_dict for sublist in [d['id'] for d in ship_packages] for ship_package_ids = [ship_package_dict for sublist in [d['id'] for d in ship_packages] for
ship_package_dict in sublist] ship_package_dict in sublist]
tally_state = 'checked_goods' if action_type == 'tally' else 'handover_completed' tally_state = 'checked_goods' if action_type == 'tally' else 'handover_completed'
...@@ -901,7 +913,8 @@ class CcBl(models.Model): ...@@ -901,7 +913,8 @@ class CcBl(models.Model):
self.env.cr.execute(sql) self.env.cr.execute(sql)
self._cr.commit() # 提交事务 self._cr.commit() # 提交事务
self.try_callback_track(max_retries=2, ship_package_ids=ship_package_ids) self.try_callback_track(max_retries=2, ship_package_ids=ship_package_ids,
user_obj=user_obj)
# 理货或尾程交接的节点 # 理货或尾程交接的节点
# 预先获取所有状态节点 # 预先获取所有状态节点
all_state_nodes = self.env['cc.node'].sudo().search([ all_state_nodes = self.env['cc.node'].sudo().search([
...@@ -946,7 +959,8 @@ class CcBl(models.Model): ...@@ -946,7 +959,8 @@ class CcBl(models.Model):
self.env.cr.execute(sql) self.env.cr.execute(sql)
self._cr.commit() # 提交事务 self._cr.commit() # 提交事务
self.try_callback_track(max_retries=2, ship_package_ids=ship_package_ids) self.try_callback_track(max_retries=2, ship_package_ids=ship_package_ids,
user_obj=user_obj)
return True return True
except Exception as err: except Exception as err:
......
import asyncio
import logging
import ssl
from datetime import timedelta, datetime
import aiohttp
import certifi
import pytz
from odoo import models, fields, api, _
def get_rfc339_time(utc_time=None):
if not utc_time:
# 获取当前时间的UTC时间
utc_time = datetime.utcnow()
# 创建+8时区的对象
target_timezone = pytz.timezone('Asia/Shanghai')
# 将UTC时间转换为目标时区时间
local_time = utc_time.replace(tzinfo=pytz.utc).astimezone(target_timezone)
# 格式化为RFC 3339格式
rfc3339_time = local_time.isoformat(timespec='seconds')
return rfc3339_time
# 定义一个方法,将时间的时区转为UTC时间
def get_utc_time(local_time=None):
if not local_time:
# 获取当前时间的UTC时间
local_time = datetime.now()
# 将本地时间转换为UTC时间
utc_time = local_time.astimezone(pytz.utc)
# 格式化为RFC 3339格式
# rfc3339_time = utc_time.isoformat(timespec='seconds')
return utc_time.strftime('%Y-%m-%d %H:%M:%S')
# 继承cc.clearance.file对象,并重载action_upload方法
class CcClearanceFile(models.Model):
_inherit = "cc.clearance.file"
_description = "Clearance File" # 清关文件
def get_clearance_file_feedback_data(self):
"""通关文件上传数据组织"""
happend_time = get_rfc339_time()
push_data = {
"master_waybill_no": self.bl_id.bl_no or '',
"customs_waybill_id": self.bl_id.customs_bl_no or '',
"operate_time": happend_time,
"file_detail": {
"file_url": "",
"file_code": self.file.decode(), # 将文件内容转换为base64编码,
"file_type": "PDF"
}
}
return push_data
def clearance_file_feedback(self):
if not self.is_upload and self.file:
data = self.get_clearance_file_feedback_data()
tt_api_obj = self.env["ao.tt.api"].sudo()
response = tt_api_obj.clearance_file_feedback(data)
response_data = response.json()
if response_data['code'] != 0:
# 清关文件回传错误
self.is_upload = False
error_msg = response_data['msg']
request_id = response_data['requestID']
code = response_data['code']
self.env['ao.tt.api.log'].sudo().create_api_log(self.file_name or '', '清关文件回传:' + error_msg, data,
code,
request_id, source='推出')
return error_msg
else:
# 清关文件回传成功
self.is_upload = True
self.upload_time = datetime.now()
request_id = response_data['requestID']
self.env['ao.tt.api.log'].sudo().create_api_log(self.file_name or '', '', data, 0, request_id,
source='推出')
return ''
# 重载action_upload方法
def action_sync(self):
self.clearance_file_feedback()
return super(CcClearanceFile, self).action_sync()
# 定义小包同步纪录对象,用于记录小包同步的纪录,包括小包对象,同步时间,操作状态,操作时间,操行备注,同步操作人
class CcShipPackageSyncLog(models.Model):
_name = 'cc.ship.package.sync.log'
_description = 'CC Ship Package Sync Log'
# 定义模型字段
# 小包对象
package_id = fields.Many2one('cc.ship.package', 'Ship Package', required=True)
# 同步时间
sync_time = fields.Datetime('Sync Time', default=fields.Datetime.now)
# 增加接口客户
api_customer = fields.Char('Api Customer')
# 操作状态
process_code = fields.Char('TK Process Code')
# 操作时间
operate_time = fields.Datetime('Operate Time', default=fields.Datetime.now)
# 操作备注
operate_remark = fields.Text('Operate Remark')
# 同步操作人
operate_user = fields.Many2one('res.users', 'Operate User', default=lambda self: self.env.user)
# 添加一个新增日志的方法,传入小包ID,API客户,操作状态,操作备注,操作时间
@api.model
def create_sync_log(self, package_id, api_customer, process_code, operate_remark, operate_time):
vals = {
'package_id': package_id,
'api_customer': api_customer,
'process_code': process_code,
'operate_remark': operate_remark,
'operate_time': operate_time
}
if self._context.get('is_mail'):
public_user = self.env.ref('base.public_user')
vals['operate_user'] = public_user.id
return self.create(vals)
# 继承小包对象,并重载action_sync方法, 增加is_sync字段
class CcShipPackage(models.Model):
_inherit = "cc.ship.package"
is_sync = fields.Boolean('Is Sync', default=False, index=True)
tk_code = fields.Char(related='state.tk_code', store=True, string='TK Code', help='TK Code')
# 增加同步日志纪录字段
sync_log_ids = fields.One2many('cc.ship.package.sync.log', 'package_id', 'Sync Logs')
def is_next_code(self, next_state_id):
"""
判断更新的节点是否是 小包状态的下级节点
:param next_state_id:
:return:
"""
if self.state:
if next_state_id in self.state.next_code_ids.ids:
return True
return False
@api.model
def create(self, vals_list):
"""
第一个节点的时候 默认已同步
"""
obj = super(CcShipPackage, self).create(vals_list)
if obj.state.is_default:
obj.is_sync = True
return obj
def action_sync(self):
for record in self:
record.is_sync = True
self.env['cc.ship.package.sync.log'].sudo().create_sync_log(record.id, 'Tiktok', record.state.tk_code,
record.state_explain,
record.process_time.strftime(
'%Y-%m-%d %H:%M:%S'))
def get_callback_track_data(self):
"""小包上传数据."""
# 获取该提单下的所有同步状态为未同步的小包
push_data = {
"provider_order_id": self.logistic_order_no,
"track_list": [
{
"shipping_method_code": self.state.tk_code,
"operate_time": get_utc_time(self.process_time),
"time_zone": "UTC+0",
"action_code": self.state.tk_code,
"operation_desc": self.state.desc,
"reason_code": self.node_exception_reason_id.name or "" # 异常原因
}
]
}
# logging.info('小包轨迹 push_data:%s' % push_data)
return push_data
def callback_track(self, is_push=True):
if not self.is_sync and self.state and self.state.tk_code:
data = self.get_callback_track_data()
if is_push:
tt_api_obj = self.env["ao.tt.api"].sudo()
response = tt_api_obj.callback_track(data)
response_data = response.json()
if response_data['code'] != 0:
self.is_sync = False
self._cr.commit()
error_msg = response_data['msg']
request_id = response_data['requestID']
code = response_data['code']
self.env['ao.tt.api.log'].sudo().create_api_log(self.tracking_no or '',
'小包状态轨迹回传:' + error_msg,
data,
code,
request_id, source='推出')
return error_msg
else:
# 回传成功
self.is_sync = True
self.env['cc.ship.package.sync.log'].sudo().create_sync_log(self.id, 'Tiktok', self.state.tk_code,
self.state_explain,
self.process_time.strftime(
'%Y-%m-%d %H:%M:%S'))
self._cr.commit()
request_id = response_data['requestID']
self.env['ao.tt.api.log'].sudo().create_api_log(self.tracking_no or '', '', data, 0, request_id,
source='推出')
return ''
else:
self.is_sync = True
self.env['cc.ship.package.sync.log'].sudo().create_sync_log(self.id, 'Tiktok', self.state.tk_code,
self.state_explain,
self.process_time.strftime(
'%Y-%m-%d %H:%M:%S'))
self._cr.commit()
request_id = ''
self.env['ao.tt.api.log'].sudo().create_api_log(self.tracking_no or '', '', data, 0, request_id,
source='推出')
return ''
def search_ship_package_info(self, pda_lang=False):
"""
查询小包信息
:return:
"""
return {
'logistic_order_no': self.logistic_order_no, # 物流订单号
}
# 继承提单对象t
class CcBl(models.Model):
_inherit = 'cc.bl'
# 计算未同步小包数量
@api.depends('ship_package_ids', 'ship_package_ids.is_sync')
def _compute_unsync_package_count(self):
for record in self:
record_counts = record.ship_package_ids.filtered(lambda r: not r.is_sync)
if record_counts:
record.unsync_package_count = len(record_counts)
else:
record.unsync_package_count = 0
# 增加未同步小包数量字段
unsync_package_count = fields.Integer('Unsync Package Count', compute='_compute_unsync_package_count', store=True)
# 定义一个方法, 获取提单下的所有未同步的小包,并回传小包状态
def callback_track(self):
is_ok = True
for item in self:
ship_packages = self.env['cc.ship.package'].search([('bl_id', '=', item.id), ('is_sync', '=', False)])
is_ok = item.package_callback_func(ship_packages.ids)
return is_ok
def package_callback_func(self, ship_package_ids):
"""
同步小包状态
"""
ship_packages = self.env['cc.ship.package'].search([('id', 'in', ship_package_ids), ('is_sync', '=', False)])
logging.info('package_callback_func ship_packages:%s' % len(ship_packages))
is_ok = True
tt_api_obj = self.env["ao.tt.api"].sudo()
async def perform_requests():
ssl_context = ssl.create_default_context(cafile=certifi.where())
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context),
timeout=aiohttp.ClientTimeout(total=60)) as session:
tasks = []
for index, package in enumerate(ship_packages):
if not package.is_sync and package.state and package.state.tk_code:
data = package.get_callback_track_data()
tasks.append(tt_api_obj.async_callback_track(session, data, package.id))
responses = await asyncio.gather(*tasks)
return responses
# 在 Odoo 中运行异步任务
responses = asyncio.run(perform_requests())
for response_item in responses:
response_data = response_item[0]
logging.info('response_data:%s' % response_data)
data = response_item[1]
package_id = response_item[2]
package_order = self.env['cc.ship.package'].sudo().browse(package_id)
if response_data['code'] != 0:
package_order.is_sync = False
self._cr.commit() # 提交事务
error_msg = response_data['msg']
request_id = response_data['requestID']
code = response_data['code']
self.env['ao.tt.api.log'].sudo().create_api_log(
package_order.tracking_no or '', '小包状态轨迹回传:' + error_msg, data, code, request_id,
source='推出')
is_ok = False
else:
# 回传成功
package_order.is_sync = True
self._cr.commit() # 提交事务
self.env['cc.ship.package.sync.log'].sudo().create_sync_log(
package_order.id, 'Tiktok', package_order.state.tk_code, package_order.state_explain,
package_order.process_time.strftime('%Y-%m-%d %H:%M:%S'))
request_id = response_data['requestID']
self.env['ao.tt.api.log'].sudo().create_api_log(
package_order.tracking_no or '', '', data, 0, request_id, source='推出')
# 如果提单有小包变成了清关开始,提单状态变为清关中;如果提单所有小包的清关节点变成"是完成节点",则该提单状态变成已完成
self.change_state_by_ship_package()
return is_ok
def deal_ship_package_state(self):
for item in self:
ship_packages = self.env['cc.ship.package'].search([('bl_id', '=', item.id), ('is_sync', '=', False)])
for package in ship_packages:
package.callback_track(is_push=False)
return True
def batch_action_sync(self):
"""
批量回传通过文件
"""
for item in self:
for line in item.cc_attachment_ids:
line.action_sync()
# 创建显示包裹的action
def action_show_no_sync_ship_package(self):
# 返回一个action,显示包裹
return {
'name': _('Not Sync Ship Packages'),
'type': 'ir.actions.act_window',
'res_model': 'cc.ship.package',
'view_mode': 'tree,form',
'domain': [('bl_id', '=', self.id), ('is_sync', '=', False)],
}
def search_bl_info(self, pda_lang=False, type='tally'):
"""
查询提单信息
"""
vals = {
'bl_no': self.bl_no or '', # 提单号
'scan_big_package_qty': self.tally_big_package_qty + self.delivered_big_package_qty if type == 'tally' else self.delivered_big_package_qty,
# 已扫大包数量
'big_package_arr': [big_package_item.search_big_package_info(pda_lang=pda_lang, type=type) for
big_package_item in
self.big_package_ids],
# 大包信息
'ship_package_arr': [ship_package_item.search_ship_package_info(pda_lang=pda_lang) for ship_package_item in
self.ship_package_ids], # 小包信息
'pallet_arr': self.get_unique_pallet_info(), # 托盘信息
}
return vals
def get_unique_pallet_info(self):
"""获取唯一托盘信息,返回托盘号和最早的使用时间"""
pallet_info = {}
for package in self.big_package_ids:
pallet_number = package.pallet_number
pallet_usage_time = package.pallet_usage_date
if pallet_number and (pallet_number not in pallet_info or pallet_info[pallet_number] > pallet_usage_time):
pallet_info[pallet_number] = pallet_usage_time
return [{'pallet_number': k, 'pallet_usage_time': v} for k, v in pallet_info.items()]
def deal_bl_no(self, bl_no, state_arr=[]):
"""
处理提单号:去掉杠和空格,并转换为小写
:param bl_no:
:return:
"""
processed_bl_no = bl_no.replace('-', '').replace(' ', '').lower()
# 查询所有提单并处理它们的 bl_no
domain = [('state', 'in', state_arr)] if state_arr else []
all_bl_obj = self.env['cc.bl'].sudo().search(domain)
bl_obj = all_bl_obj.filtered(
lambda r: r.bl_no.replace('-', '').replace(' ', '').lower() == processed_bl_no) # 提单
return bl_obj
def try_callback_track(self, max_retries=3, ship_package_ids=[]):
""" 封装的重试逻辑 """
for i in range(max_retries):
if not ship_package_ids:
is_ok = self.callback_track()
else:
is_ok = self.package_callback_func(ship_package_ids)
if is_ok:
return True
logging.warning(f"Attempt {i + 1}/{max_retries} failed. Retrying...")
return False
def mail_auto_push(self, mail_time=False, ship_packages=[], action_type='tally'):
self = self.with_context(dict(self._context, is_mail=True))
for item in self:
# try:
if mail_time:
utc_time = datetime.strptime(mail_time, "%Y-%m-%d %H:%M:%S")
before_min = self.env['ir.config_parameter'].sudo().get_param('before_min') or 10
before_utc_time = utc_time - timedelta(minutes=int(before_min))
item.push_clear_customs_start(before_utc_time)
# 尝试调用 callback_track
if self.try_callback_track():
item.push_clear_customs_end(utc_time)
# 再次尝试调用 callback_track
if not self.try_callback_track():
logging.error(f"Failed to push item after {3} attempts.")
else:
logging.error(f"Failed to start process for item after {3} attempts.")
elif ship_packages:
ship_package_ids = [ship_package_dict for sublist in [d['id'] for d in ship_packages] for
ship_package_dict in sublist]
tally_state = 'checked_goods' if action_type == 'tally' else 'handover_completed'
# 后续节点
node_obj = self.env['cc.node'].sudo().search([
('node_type', '=', 'package'),
('tally_state', '=', tally_state) # 检查理货或尾程交接的节点,根据排序进行升序
], order='seq asc')
if node_obj:
all_ship_package_obj = self.env['cc.ship.package'].search(
[('id', 'in', ship_package_ids)]) # 所有小包
# 预先获取所有同步日志 - 批量查询
all_sync_logs = self.env['cc.ship.package.sync.log'].sudo().search([
('package_id', 'in', ship_package_ids)
])
# 构建同步日志字典以加快查找
sync_log_dict = {}
for log in all_sync_logs:
if log.package_id.id not in sync_log_dict:
sync_log_dict[log.package_id.id] = set()
sync_log_dict[log.package_id.id].add(log.process_code)
# 构建ship_packages字典,用于快速查找
ship_packages_dict = {}
for package in ship_packages:
# 如果一个id在多个package中出现,使用最后一个package的tally_time
if package.get('tally_time'):
for single_id in package['id']:
ship_packages_dict[single_id] = package['tally_time']
# 前序节点 理货或尾程交接之前没有生成的节点
before_node_obj = self.env['cc.node'].sudo().search([
('node_type', '=', 'package'), ('is_must', '=', True), ('seq', '<', node_obj[0].seq)],
order='seq asc')
# 理货或尾程交接之前没有生成的节点
for before_node in before_node_obj:
print('before_node:%s' % before_node.name)
before_minutes = before_node.calculate_total_interval(node_obj[0])
# 准备批量更新数据
update_data = []
for package in all_ship_package_obj:
package_id = package.id
if package_id not in sync_log_dict or before_node.tk_code not in sync_log_dict.get(package_id, set()):
tally_time = ship_packages_dict.get(package_id)
if tally_time:
operation_time = (datetime.strptime(tally_time, '%Y-%m-%d %H:%M:%S') - timedelta(
minutes=before_minutes)) if tally_time else fields.Datetime.now() - timedelta(
minutes=before_minutes)
update_data.append((
package_id,
before_node.id,
operation_time,
before_node.desc,
True if before_node.is_default else False
))
if update_data:
# 构建批量更新SQL
values_str = ','.join(self.env.cr.mogrify("(%s,%s,%s,%s,%s)", row).decode('utf-8') for row in update_data)
sql = """
UPDATE cc_ship_package AS t SET
state = c.state,
process_time = c.process_time,
state_explain = c.state_explain,
is_sync = c.is_sync
FROM (VALUES
{}
) AS c(id, state, process_time, state_explain, is_sync)
WHERE t.id = c.id
""".format(values_str)
self.env.cr.execute(sql)
self._cr.commit() # 提交事务
# # 更新提单的未同步小包数量
# sql = """
# UPDATE cc_bl AS bl SET
# unsync_package_count = (
# SELECT COUNT(*)
# FROM cc_ship_package sp
# WHERE sp.bl_id = bl.id
# AND sp.is_sync = false
# )
# WHERE bl.id = %s
# """
# self.env.cr.execute(sql, (item.id,))
# self._cr.commit() # 提交事务
self.try_callback_track(max_retries=2, ship_package_ids=ship_package_ids)
# 理货或尾程交接的节点
# 预先获取所有状态节点
all_state_nodes = self.env['cc.node'].sudo().search([
('node_type', '=', 'package')
])
state_node_dict = {node.name: node for node in all_state_nodes}
next_minutes = int(self.env['ir.config_parameter'].sudo().get_param('next_minutes', default=20))
for index, node in enumerate(node_obj):
print('node:%s' % node.name)
update_data = []
for package in all_ship_package_obj:
if package.state.name in state_node_dict:
current_state_node = state_node_dict[package.state.name]
if current_state_node.seq < node.seq:
tally_time = ship_packages_dict.get(package.id)
if tally_time:
operation_time = (datetime.strptime(tally_time, '%Y-%m-%d %H:%M:%S') + timedelta(
minutes=next_minutes * index)) if tally_time else fields.Datetime.now() + timedelta(
minutes=next_minutes * index)
update_data.append((
package.id,
node.id,
operation_time,
node.desc,
True if node.is_default else False
))
print('update_data:%s' % update_data)
if update_data:
print('11111111111')
# 构建批量更新SQL
values_str = ','.join(self.env.cr.mogrify("(%s,%s,%s,%s,%s)", row).decode('utf-8') for row in update_data)
sql = """
UPDATE cc_ship_package AS t SET
state = c.state,
process_time = c.process_time,
state_explain = c.state_explain,
is_sync = c.is_sync
FROM (VALUES
{}
) AS c(id, state, process_time, state_explain, is_sync)
WHERE t.id = c.id
""".format(values_str)
self.env.cr.execute(sql)
self._cr.commit() # 提交事务
# # 更新提单的未同步小包数量
# sql = """UPDATE cc_bl AS bl SET unsync_package_count = ( SELECT COUNT(*) FROM cc_ship_package sp WHERE sp.bl_id = bl.id AND sp.is_sync = false) WHERE bl.id = %s """
# self.env.cr.execute(sql, (item.id,))
# self._cr.commit() # 提交事务
print('222222')
self.try_callback_track(max_retries=2, ship_package_ids=ship_package_ids)
print('33333')
return True
# except Exception as err:
# logging.error('fetch_mail_dlv--error:%s' % str(err))
def change_state_by_ship_package(self):
"""
根据小包的状态修改提单的状态
:return:
"""
# 如果提单有小包变成了清关开始,提单状态变为清关中
if self.state == 'draft' and self.ship_package_ids.filtered(
lambda line: line.state.tk_code == 'cb_imcustoms_start'):
self.ccing_func()
# 如果提单所有小包的清关节点变成"是完成节点",则该提单状态变成已完成
if all(line.state.is_done for line in self.ship_package_ids) and self.unsync_package_count <= 0:
self.done_func()
class CcBigPackage(models.Model):
# 模型名称
_inherit = 'cc.big.package'
# 模型描述
_description = 'Big Package'
def search_big_package_info(self, pda_lang=False, type='tally'):
"""
查询大包信息
"""
unprocessed_goods_msg_dic = {
'en': 'Unprocessed goods',
'zh': '未理货'
}
checked_goods_msg_dic = {
'en': 'Checked goods',
'zh': '已理货'
}
handover_completed_msg_dic = {
'en': 'Handover Completed',
'zh': '尾程交接'
}
state_arr = {'unprocessed_goods': unprocessed_goods_msg_dic[pda_lang],
'checked_goods': checked_goods_msg_dic[pda_lang],
'handover_completed': handover_completed_msg_dic[pda_lang]} # 未理货/已理货/尾程交接
# 根据下一阶段服务商名称获取尾程服务商的记录
provider_obj = self.env['cc.last.mile.provider'].match_provider(self.next_provider_name)
vals = {
'tally_state_label': state_arr[self.tally_state] or '', # 理货状态显示名称
'tally_state': self.tally_state or '', # 理货状态系统KEY
'tally_user_id': (self.tally_user_id.id or 0) if type == 'tally' else (self.delivery_user_id.id or 0),
# 理货人id/交货人id
'tally_user_name': (self.tally_user_id.name or '') if type == 'tally' else (
self.delivery_user_id.name or ''),
# 理货人名称/交货人名称
'tally_time': (self.tally_time or '') if type == 'tally' else (self.delivery_time or ''),
# self.env['common.common'].sudo().get_format_time(str(self.tally_time)) if self.tally_time else '',
# 理货时间/交货时间
'big_package_no': self.big_package_no or '', # 大包号
'next_service_provider_name': self.next_provider_name or '', # 下一个服务商名称
'next_service_provider_tape_color': (provider_obj.tape_color_value or '') if provider_obj else '',
# 下一个服务商胶带对应色值
'pallet_number': self.pallet_number or '', # 托盘号
'pallet_usage_time': self.pallet_usage_date or '' # 托盘使用时间
}
return vals
def update_big_package_info(self, **kwargs):
"""
理货 tally/尾程交接 handover
"""
action_type = kwargs.get('action_type')
for item in self:
if action_type == 'tally' and item.tally_state == 'unprocessed_goods':
# 更新理货信息
self._update_info(item, kwargs, 'tally')
elif action_type == 'handover' and item.tally_state != 'handover_completed':
# 更新交接信息
self._update_info(item, kwargs, 'handover')
def _update_info(self, item, kwargs, action_type):
"""
更新信息的通用方法
"""
if action_type == 'tally':
if kwargs.get('tally_state'):
item.tally_state = kwargs['tally_state']
if kwargs.get('tally_user_id'):
item.tally_user_id = kwargs['tally_user_id']
if kwargs.get('tally_time'):
item.tally_time = datetime.strptime(kwargs['tally_time'], '%Y-%m-%d %H:%M:%S')
elif action_type == 'handover':
if kwargs.get('tally_state'):
item.tally_state = kwargs['tally_state']
if kwargs.get('tally_user_id'):
item.delivery_user_id = kwargs['tally_user_id']
if kwargs.get('tally_time'):
item.delivery_time = datetime.strptime(kwargs['tally_time'], '%Y-%m-%d %H:%M:%S')
import asyncio
import logging
import ssl
from datetime import timedelta, datetime
import aiohttp
import certifi
import pytz
from odoo import models, fields, api, _
def get_rfc339_time(utc_time=None):
if not utc_time:
# 获取当前时间的UTC时间
utc_time = datetime.utcnow()
# 创建+8时区的对象
target_timezone = pytz.timezone('Asia/Shanghai')
# 将UTC时间转换为目标时区时间
local_time = utc_time.replace(tzinfo=pytz.utc).astimezone(target_timezone)
# 格式化为RFC 3339格式
rfc3339_time = local_time.isoformat(timespec='seconds')
return rfc3339_time
# 定义一个方法,将时间的时区转为UTC时间
def get_utc_time(local_time=None):
if not local_time:
# 获取当前时间的UTC时间
local_time = datetime.now()
# 将本地时间转换为UTC时间
utc_time = local_time.astimezone(pytz.utc)
# 格式化为RFC 3339格式
# rfc3339_time = utc_time.isoformat(timespec='seconds')
return utc_time.strftime('%Y-%m-%d %H:%M:%S')
# 继承cc.clearance.file对象,并重载action_upload方法
class CcClearanceFile(models.Model):
_inherit = "cc.clearance.file"
_description = "Clearance File" # 清关文件
def get_clearance_file_feedback_data(self):
"""通关文件上传数据组织"""
happend_time = get_rfc339_time()
push_data = {
"master_waybill_no": self.bl_id.bl_no or '',
"customs_waybill_id": self.bl_id.customs_bl_no or '',
"operate_time": happend_time,
"file_detail": {
"file_url": "",
"file_code": self.file.decode(), # 将文件内容转换为base64编码,
"file_type": "PDF"
}
}
return push_data
def clearance_file_feedback(self):
if not self.is_upload and self.file:
data = self.get_clearance_file_feedback_data()
tt_api_obj = self.env["ao.tt.api"].sudo()
response = tt_api_obj.clearance_file_feedback(data)
response_data = response.json()
if response_data['code'] != 0:
# 清关文件回传错误
self.is_upload = False
error_msg = response_data['msg']
request_id = response_data['requestID']
code = response_data['code']
self.env['ao.tt.api.log'].sudo().create_api_log(self.file_name or '', '清关文件回传:' + error_msg, data,
code,
request_id, source='推出')
return error_msg
else:
# 清关文件回传成功
self.is_upload = True
self.upload_time = datetime.now()
request_id = response_data['requestID']
self.env['ao.tt.api.log'].sudo().create_api_log(self.file_name or '', '', data, 0, request_id,
source='推出')
return ''
# 重载action_upload方法
def action_sync(self):
self.clearance_file_feedback()
return super(CcClearanceFile, self).action_sync()
# 定义小包同步纪录对象,用于记录小包同步的纪录,包括小包对象,同步时间,操作状态,操作时间,操行备注,同步操作人
class CcShipPackageSyncLog(models.Model):
_name = 'cc.ship.package.sync.log'
_description = 'CC Ship Package Sync Log'
# 定义模型字段
# 小包对象
package_id = fields.Many2one('cc.ship.package', 'Ship Package', required=True)
# 同步时间
sync_time = fields.Datetime('Sync Time', default=fields.Datetime.now)
# 增加接口客户
api_customer = fields.Char('Api Customer')
# 操作状态
process_code = fields.Char('TK Process Code')
# 操作时间
operate_time = fields.Datetime('Operate Time', default=fields.Datetime.now)
# 操作备注
operate_remark = fields.Text('Operate Remark')
# 同步操作人
operate_user = fields.Many2one('res.users', 'Operate User', default=lambda self: self.env.user)
# 添加一个新增日志的方法,传入小包ID,API客户,操作状态,操作备注,操作时间
@api.model
def create_sync_log(self, package_id, api_customer, process_code, operate_remark, operate_time):
vals = {
'package_id': package_id,
'api_customer': api_customer,
'process_code': process_code,
'operate_remark': operate_remark,
'operate_time': operate_time
}
if self._context.get('is_mail'):
public_user = self.env.ref('base.public_user')
vals['operate_user'] = public_user.id
return self.create(vals)
# 继承小包对象,并重载action_sync方法, 增加is_sync字段
class CcShipPackage(models.Model):
_inherit = "cc.ship.package"
is_sync = fields.Boolean('Is Sync', default=False, index=True)
tk_code = fields.Char(related='state.tk_code', store=True, string='TK Code', help='TK Code')
# 增加同步日志纪录字段
sync_log_ids = fields.One2many('cc.ship.package.sync.log', 'package_id', 'Sync Logs')
def is_next_code(self, next_state_id):
"""
判断更新的节点是否是 小包状态的下级节点
:param next_state_id:
:return:
"""
if self.state:
if next_state_id in self.state.next_code_ids.ids:
return True
return False
@api.model
def create(self, vals_list):
"""
第一个节点的时候 默认已同步
"""
obj = super(CcShipPackage, self).create(vals_list)
if obj.state.is_default:
obj.is_sync = True
return obj
def action_sync(self):
for record in self:
record.is_sync = True
self.env['cc.ship.package.sync.log'].sudo().create_sync_log(record.id, 'Tiktok', record.state.tk_code,
record.state_explain,
record.process_time.strftime(
'%Y-%m-%d %H:%M:%S'))
def get_callback_track_data(self):
"""小包上传数据."""
# 获取该提单下的所有同步状态为未同步的小包
push_data = {
"provider_order_id": self.logistic_order_no,
"track_list": [
{
"shipping_method_code": self.state.tk_code,
"operate_time": get_utc_time(self.process_time),
"time_zone": "UTC+0",
"action_code": self.state.tk_code,
"operation_desc": self.state.desc,
"reason_code": self.node_exception_reason_id.name or "" # 异常原因
}
]
}
# logging.info('小包轨迹 push_data:%s' % push_data)
return push_data
def callback_track(self, is_push=True):
if not self.is_sync and self.state and self.state.tk_code:
data = self.get_callback_track_data()
if is_push:
tt_api_obj = self.env["ao.tt.api"].sudo()
response = tt_api_obj.callback_track(data)
response_data = response.json()
if response_data['code'] != 0:
self.is_sync = False
self._cr.commit()
error_msg = response_data['msg']
request_id = response_data['requestID']
code = response_data['code']
self.env['ao.tt.api.log'].sudo().create_api_log(self.tracking_no or '',
'小包状态轨迹回传:' + error_msg,
data,
code,
request_id, source='推出')
return error_msg
else:
# 回传成功
self.is_sync = True
self.env['cc.ship.package.sync.log'].sudo().create_sync_log(self.id, 'Tiktok', self.state.tk_code,
self.state_explain,
self.process_time.strftime(
'%Y-%m-%d %H:%M:%S'))
self._cr.commit()
request_id = response_data['requestID']
self.env['ao.tt.api.log'].sudo().create_api_log(self.tracking_no or '', '', data, 0, request_id,
source='推出')
return ''
else:
self.is_sync = True
self.env['cc.ship.package.sync.log'].sudo().create_sync_log(self.id, 'Tiktok', self.state.tk_code,
self.state_explain,
self.process_time.strftime(
'%Y-%m-%d %H:%M:%S'))
self._cr.commit()
request_id = ''
self.env['ao.tt.api.log'].sudo().create_api_log(self.tracking_no or '', '', data, 0, request_id,
source='推出')
return ''
def search_ship_package_info(self, pda_lang=False):
"""
查询小包信息
:return:
"""
return {
'logistic_order_no': self.logistic_order_no, # 物流订单号
}
# 继承提单对象t
class CcBl(models.Model):
_inherit = 'cc.bl'
# 计算未同步小包数量
@api.depends('ship_package_ids', 'ship_package_ids.is_sync')
def _compute_unsync_package_count(self):
for record in self:
record_counts = record.ship_package_ids.filtered(lambda r: not r.is_sync)
if record_counts:
record.unsync_package_count = len(record_counts)
else:
record.unsync_package_count = 0
# 增加未同步小包数量字段
unsync_package_count = fields.Integer('Unsync Package Count', compute='_compute_unsync_package_count', store=True)
# 定义一个方法, 获取提单下的所有未同步的小包,并回传小包状态
def callback_track(self):
is_ok = True
for item in self:
ship_packages = self.env['cc.ship.package'].search([('bl_id', '=', item.id), ('is_sync', '=', False)])
is_ok = item.package_callback_func(ship_packages.ids)
return is_ok
def package_callback_func(self, ship_package_ids):
"""
同步小包状态
"""
ship_packages = self.env['cc.ship.package'].search([('id', 'in', ship_package_ids), ('is_sync', '=', False)])
logging.info('package_callback_func ship_packages:%s' % len(ship_packages))
is_ok = True
tt_api_obj = self.env["ao.tt.api"].sudo()
async def perform_requests():
ssl_context = ssl.create_default_context(cafile=certifi.where())
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context),
timeout=aiohttp.ClientTimeout(total=60)) as session:
tasks = []
for index, package in enumerate(ship_packages):
if not package.is_sync and package.state and package.state.tk_code:
data = package.get_callback_track_data()
tasks.append(tt_api_obj.async_callback_track(session, data, package.id))
responses = await asyncio.gather(*tasks)
return responses
# 在 Odoo 中运行异步任务
responses = asyncio.run(perform_requests())
for response_item in responses:
response_data = response_item[0]
logging.info('response_data:%s' % response_data)
data = response_item[1]
package_id = response_item[2]
package_order = self.env['cc.ship.package'].sudo().browse(package_id)
if response_data['code'] != 0:
package_order.is_sync = False
error_msg = response_data['msg']
request_id = response_data['requestID']
code = response_data['code']
self.env['ao.tt.api.log'].sudo().create_api_log(
package_order.tracking_no or '', '小包状态轨迹回传:' + error_msg, data, code, request_id,
source='推出')
is_ok = False
else:
# 回传成功
package_order.is_sync = True
self.env['cc.ship.package.sync.log'].sudo().create_sync_log(
package_order.id, 'Tiktok', package_order.state.tk_code, package_order.state_explain,
package_order.process_time.strftime('%Y-%m-%d %H:%M:%S'))
request_id = response_data['requestID']
self.env['ao.tt.api.log'].sudo().create_api_log(
package_order.tracking_no or '', '', data, 0, request_id, source='推出')
# 如果提单有小包变成了清关开始,提单状态变为清关中;如果提单所有小包的清关节点变成"是完成节点",则该提单状态变成已完成
self.change_state_by_ship_package()
return is_ok
def deal_ship_package_state(self):
for item in self:
ship_packages = self.env['cc.ship.package'].search([('bl_id', '=', item.id), ('is_sync', '=', False)])
for package in ship_packages:
package.callback_track(is_push=False)
return True
def batch_action_sync(self):
"""
批量回传通过文件
"""
for item in self:
for line in item.cc_attachment_ids:
line.action_sync()
# 创建显示包裹的action
def action_show_no_sync_ship_package(self):
# 返回一个action,显示包裹
return {
'name': _('Not Sync Ship Packages'),
'type': 'ir.actions.act_window',
'res_model': 'cc.ship.package',
'view_mode': 'tree,form',
'domain': [('bl_id', '=', self.id), ('is_sync', '=', False)],
}
def search_bl_info(self, pda_lang=False, type='tally'):
"""
查询提单信息
"""
vals = {
'bl_no': self.bl_no or '', # 提单号
'scan_big_package_qty': self.tally_big_package_qty + self.delivered_big_package_qty if type == 'tally' else self.delivered_big_package_qty,
# 已扫大包数量
'big_package_arr': [big_package_item.search_big_package_info(pda_lang=pda_lang, type=type) for
big_package_item in
self.big_package_ids],
# 大包信息
'ship_package_arr': [ship_package_item.search_ship_package_info(pda_lang=pda_lang) for ship_package_item in
self.ship_package_ids], # 小包信息
'pallet_arr': self.get_unique_pallet_info(), # 托盘信息
}
return vals
def get_unique_pallet_info(self):
"""获取唯一托盘信息,返回托盘号和最早的使用时间"""
pallet_info = {}
for package in self.big_package_ids:
pallet_number = package.pallet_number
pallet_usage_time = package.pallet_usage_date
if pallet_number and (pallet_number not in pallet_info or pallet_info[pallet_number] > pallet_usage_time):
pallet_info[pallet_number] = pallet_usage_time
return [{'pallet_number': k, 'pallet_usage_time': v} for k, v in pallet_info.items()]
def deal_bl_no(self, bl_no, state_arr=[]):
"""
处理提单号:去掉杠和空格,并转换为小写
:param bl_no:
:return:
"""
processed_bl_no = bl_no.replace('-', '').replace(' ', '').lower()
# 查询所有提单并处理它们的 bl_no
domain = [('state', 'in', state_arr)] if state_arr else []
all_bl_obj = self.env['cc.bl'].sudo().search(domain)
bl_obj = all_bl_obj.filtered(
lambda r: r.bl_no.replace('-', '').replace(' ', '').lower() == processed_bl_no) # 提单
return bl_obj
def try_callback_track(self, max_retries=3, ship_package_ids=[]):
""" 封装的重试逻辑 """
for i in range(max_retries):
if not ship_package_ids:
is_ok = self.callback_track()
else:
is_ok = self.package_callback_func(ship_package_ids)
if is_ok:
return True
logging.warning(f"Attempt {i + 1}/{max_retries} failed. Retrying...")
return False
def mail_auto_push(self, mail_time=False, ship_packages=[], action_type='tally'):
self = self.with_context(dict(self._context, is_mail=True))
for item in self:
# try:
if mail_time:
utc_time = datetime.strptime(mail_time, "%Y-%m-%d %H:%M:%S")
before_min = self.env['ir.config_parameter'].sudo().get_param('before_min') or 10
before_utc_time = utc_time - timedelta(minutes=int(before_min))
item.push_clear_customs_start(before_utc_time)
# 尝试调用 callback_track
if self.try_callback_track():
item.push_clear_customs_end(utc_time)
# 再次尝试调用 callback_track
if not self.try_callback_track():
logging.error(f"Failed to push item after {3} attempts.")
else:
logging.error(f"Failed to start process for item after {3} attempts.")
elif ship_packages:
ship_package_ids = [ship_package_dict for sublist in [d['id'] for d in ship_packages] for
ship_package_dict in sublist]
tally_state = 'checked_goods' if action_type == 'tally' else 'handover_completed'
# 后续节点
node_obj = self.env['cc.node'].sudo().search([
('node_type', '=', 'package'),
('tally_state', '=', tally_state) # 检查理货或尾程交接的节点,根据排序进行升序
], order='seq asc')
if node_obj:
all_ship_package_obj = self.env['cc.ship.package'].search(
[('id', 'in', ship_package_ids)]) # 所有小包
# 预先获取所有同步日志 - 批量查询
all_sync_logs = self.env['cc.ship.package.sync.log'].sudo().search([
('package_id', 'in', ship_package_ids)
])
# 构建同步日志字典以加快查找
sync_log_dict = {}
for log in all_sync_logs:
if log.package_id.id not in sync_log_dict:
sync_log_dict[log.package_id.id] = set()
sync_log_dict[log.package_id.id].add(log.process_code)
# 构建ship_packages字典,用于快速查找
ship_packages_dict = {}
for package in ship_packages:
# 如果一个id在多个package中出现,使用最后一个package的tally_time
if package.get('tally_time'):
for single_id in package['id']:
ship_packages_dict[single_id] = package['tally_time']
# 前序节点 理货或尾程交接之前没有生成的节点
before_node_obj = self.env['cc.node'].sudo().search([
('node_type', '=', 'package'), ('is_must', '=', True), ('seq', '<', node_obj[0].seq)],
order='seq asc')
# 理货或尾程交接之前没有生成的节点
for before_node in before_node_obj:
before_minutes = before_node.calculate_total_interval(node_obj[0])
for package in all_ship_package_obj:
package_id = package.id
if package_id not in sync_log_dict or before_node.tk_code not in sync_log_dict.get(
package_id, set()):
tally_time = ship_packages_dict.get(package_id)
if tally_time:
operation_time = (
datetime.strptime(tally_time, '%Y-%m-%d %H:%M:%S') - timedelta(
minutes=before_minutes)) if tally_time else fields.Datetime.now() - timedelta(
minutes=before_minutes)
package.write({
'state': before_node.id,
'process_time': operation_time,
'state_explain': before_node.desc,
'is_sync': True if before_node.is_default else False
})
self.try_callback_track(max_retries=2, ship_package_ids=ship_package_ids)
# 理货或尾程交接的节点
# 预先获取所有状态节点
all_state_nodes = self.env['cc.node'].sudo().search([
('node_type', '=', 'package')
])
state_node_dict = {node.name: node for node in all_state_nodes}
next_minutes = int(self.env['ir.config_parameter'].sudo().get_param('next_minutes', default=20))
for index, node in enumerate(node_obj):
for package in all_ship_package_obj:
if package.state.name in state_node_dict:
current_state_node = state_node_dict[package.state.name]
if current_state_node.seq < node.seq:
tally_time = ship_packages_dict.get(package.id)
if tally_time:
operation_time = (
datetime.strptime(tally_time, '%Y-%m-%d %H:%M:%S') + timedelta(
minutes=next_minutes * index)) if tally_time else fields.Datetime.now() + timedelta(
minutes=next_minutes * index)
package.write({
'state': node.id,
'process_time': operation_time,
'state_explain': node.desc,
'is_sync': True if node.is_default else False
})
self.try_callback_track(max_retries=2, ship_package_ids=ship_package_ids)
return True
# except Exception as err:
# logging.error('fetch_mail_dlv--error:%s' % str(err))
def change_state_by_ship_package(self):
"""
根据小包的状态修改提单的状态
:return:
"""
# 如果提单有小包变成了清关开始,提单状态变为清关中
if self.state == 'draft' and self.ship_package_ids.filtered(
lambda line: line.state.tk_code == 'cb_imcustoms_start'):
self.ccing_func()
# 如果提单所有小包的清关节点变成"是完成节点",则该提单状态变成已完成
if all(line.state.is_done for line in self.ship_package_ids) and self.unsync_package_count <= 0:
self.done_func()
class CcBigPackage(models.Model):
# 模型名称
_inherit = 'cc.big.package'
# 模型描述
_description = 'Big Package'
def search_big_package_info(self, pda_lang=False, type='tally'):
"""
查询大包信息
"""
unprocessed_goods_msg_dic = {
'en': 'Unprocessed goods',
'zh': '未理货'
}
checked_goods_msg_dic = {
'en': 'Checked goods',
'zh': '已理货'
}
handover_completed_msg_dic = {
'en': 'Handover Completed',
'zh': '尾程交接'
}
state_arr = {'unprocessed_goods': unprocessed_goods_msg_dic[pda_lang],
'checked_goods': checked_goods_msg_dic[pda_lang],
'handover_completed': handover_completed_msg_dic[pda_lang]} # 未理货/已理货/尾程交接
# 根据下一阶段服务商名称获取尾程服务商的记录
provider_obj = self.env['cc.last.mile.provider'].match_provider(self.next_provider_name)
vals = {
'tally_state_label': state_arr[self.tally_state] or '', # 理货状态显示名称
'tally_state': self.tally_state or '', # 理货状态系统KEY
'tally_user_id': (self.tally_user_id.id or 0) if type == 'tally' else (self.delivery_user_id.id or 0),
# 理货人id/交货人id
'tally_user_name': (self.tally_user_id.name or '') if type == 'tally' else (
self.delivery_user_id.name or ''),
# 理货人名称/交货人名称
'tally_time': (self.tally_time or '') if type == 'tally' else (self.delivery_time or ''),
# self.env['common.common'].sudo().get_format_time(str(self.tally_time)) if self.tally_time else '',
# 理货时间/交货时间
'big_package_no': self.big_package_no or '', # 大包号
'next_service_provider_name': self.next_provider_name or '', # 下一个服务商名称
'next_service_provider_tape_color': (provider_obj.tape_color_value or '') if provider_obj else '',
# 下一个服务商胶带对应色值
'pallet_number': self.pallet_number or '', # 托盘号
'pallet_usage_time': self.pallet_usage_date or '' # 托盘使用时间
}
return vals
def update_big_package_info(self, **kwargs):
"""
理货 tally/尾程交接 handover
"""
action_type = kwargs.get('action_type')
for item in self:
if action_type == 'tally' and item.tally_state == 'unprocessed_goods':
# 更新理货信息
self._update_info(item, kwargs, 'tally')
elif action_type == 'handover' and item.tally_state != 'handover_completed':
# 更新交接信息
self._update_info(item, kwargs, 'handover')
def _update_info(self, item, kwargs, action_type):
"""
更新信息的通用方法
"""
if action_type == 'tally':
if kwargs.get('tally_state'):
item.tally_state = kwargs['tally_state']
if kwargs.get('tally_user_id'):
item.tally_user_id = kwargs['tally_user_id']
if kwargs.get('tally_time'):
item.tally_time = datetime.strptime(kwargs['tally_time'], '%Y-%m-%d %H:%M:%S')
elif action_type == 'handover':
if kwargs.get('tally_state'):
item.tally_state = kwargs['tally_state']
if kwargs.get('tally_user_id'):
item.delivery_user_id = kwargs['tally_user_id']
if kwargs.get('tally_time'):
item.delivery_time = datetime.strptime(kwargs['tally_time'], '%Y-%m-%d %H:%M:%S')
...@@ -25,14 +25,14 @@ ...@@ -25,14 +25,14 @@
<field name="inherit_id" ref="ccs_base.form_cc_bl_view"/> <field name="inherit_id" ref="ccs_base.form_cc_bl_view"/>
<field name="arch" type="xml"> <field name="arch" type="xml">
<button name="%(ccs_base.action_batch_input_ship_package_wizard)d" position="after"> <button name="%(ccs_base.action_batch_input_ship_package_wizard)d" position="after">
<button name="callback_track" string="Sync Package Status" type="object"/> <button name="action_callback_track" string="Sync Package Status" type="object"/>
</button> </button>
<button name="%(ccs_base.action_batch_input_bl_status_wizard)d" position="before"> <button name="%(ccs_base.action_batch_input_bl_status_wizard)d" position="before">
<button name="batch_action_sync" string="Sync CC Attachment" type="object"/> <button name="batch_action_sync" string="Sync CC Attachment" type="object"/>
</button> </button>
<button name="%(ccs_base.action_batch_input_bl_status_wizard)d" position="after"> <button name="%(ccs_base.action_batch_input_bl_status_wizard)d" position="after">
<!--增加同步提单状态的按钮--> <!--增加同步提单状态的按钮-->
<button name="callback_track_bl" string="Sync Bill Of Loading Status" type="object"/> <button name="action_callback_track_bl" string="Sync Bill Of Loading Status" type="object"/>
</button> </button>
<button name="action_show_ship_package" position="replace"> <button name="action_show_ship_package" position="replace">
...@@ -73,7 +73,7 @@ ...@@ -73,7 +73,7 @@
<field name="state">code</field> <field name="state">code</field>
<field name="code"> <field name="code">
if records: if records:
records.callback_track() records.action_callback_track()
</field> </field>
</record> </record>
...@@ -85,7 +85,7 @@ ...@@ -85,7 +85,7 @@
<field name="state">code</field> <field name="state">code</field>
<field name="code"> <field name="code">
if records: if records:
records.callback_track_bl() records.action_callback_track_bl()
</field> </field>
</record> </record>
......
...@@ -8,12 +8,12 @@ import redis ...@@ -8,12 +8,12 @@ import redis
import config import config
# 默认字符gbk # 默认字符gbk
logging.basicConfig(filename='./push_data_logger.log', level=logging.INFO) # logging.basicConfig(filename='./push_data_logger.log', level=logging.INFO)
# 设置文件字符为utf-8 # 设置文件字符为utf-8
# logging.basicConfig(handlers=[logging.FileHandler('logs/mail_push.log', 'a', 'utf-8')], logging.basicConfig(handlers=[logging.FileHandler('logs/mail_push.log', 'a', 'utf-8')],
# format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
class Order_dispose(object): class Order_dispose(object):
...@@ -40,7 +40,8 @@ class Order_dispose(object): ...@@ -40,7 +40,8 @@ class Order_dispose(object):
bl_record = bl_obj.browse(data['id']) bl_record = bl_obj.browse(data['id'])
# utc_time = datetime.strptime(data['utc_time'], "%Y-%m-%d %H:%M:%S") # utc_time = datetime.strptime(data['utc_time'], "%Y-%m-%d %H:%M:%S")
utc_time = data.get('utc_time') utc_time = data.get('utc_time')
bl_record.mail_auto_push(utc_time, ship_packages, action_type) user_login = data.get('user_login')
bl_record.mail_auto_push(utc_time, ship_packages, action_type, user_login, config.pda_db_user)
except Exception as ex: except Exception as ex:
logging.error('mail_auto_push error:%s' % str(ex)) logging.error('mail_auto_push error:%s' % str(ex))
return res_data return res_data
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论