Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
X
xqh_temu_api
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
lqy
xqh_temu_api
Commits
351a9d6e
提交
351a9d6e
authored
2月 06, 2026
作者:
刘擎阳
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
1.服务提交
上级
e2472b49
隐藏空白字符变更
内嵌
并排
正在显示
12 个修改的文件
包含
385 行增加
和
1694 行删除
+385
-1694
temu_api.py
app/api/temu_api.py
+80
-83
manage.py
app/manage.py
+1
-1
create_pdf.py
services/create_pdf.py
+66
-0
config.py
services/dependence/services/config.py
+1
-1
db_service.py
services/dependence/services/db_service.py
+2
-2
util.py
services/dependence/services/util.py
+6
-7
tiktok_custom.log
services/logs/tiktok_custom.log
+0
-0
tiktok_queue.log
services/logs/tiktok_queue.log
+0
-0
create_pdf.conf
services/supervisord_conf/conf.d/create_pdf.conf
+5
-5
temu_service.conf
services/supervisord_conf/conf.d/temu_service.conf
+3
-3
temu_service.py
services/temu_service.py
+221
-786
tiktok_queue.py
services/tiktok_queue.py
+0
-806
没有找到文件。
app/api/temu_api.py
浏览文件 @
351a9d6e
...
...
@@ -41,6 +41,7 @@ _logger = logging.getLogger(__name__)
@api.route
(
'/temu/create/order'
,
methods
=
[
'post'
])
# @check_customer
def
temu_create_order
():
"""创建订单"""
res
=
{
"success"
:
True
,
"errorCode"
:
0
,
...
...
@@ -91,74 +92,102 @@ def temu_create_order():
@api.route
(
'/temu/update/order'
,
methods
=
[
'post'
])
# @check_customer
def
temu_update_order
():
"""更新订单"""
# 接收提单信息、大包与小包的关联信息
res
=
{
"code"
:
0
,
"msg"
:
"success"
,
"success"
:
True
,
"errorCode"
:
0
,
"errorMsg"
:
"success"
,
"requestID"
:
"202312251715021060522200417739B9"
,
"ts"
:
"2023-12-25 17:15:03"
"serverTimeMs"
:
int
(
time
.
time
()
*
1000
),
"result"
:
None
,
}
request_time
=
datetime
.
utcnow
()
timestamp
=
int
(
time
.
time
())
res
[
'ts'
]
=
request_time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
#
res['ts'] = request_time.strftime("%Y-%m-%d %H:%M:%S")
res
[
'requestID'
]
=
request_time
.
strftime
(
"
%
Y
%
m
%
d
%
H
%
M
%
S"
)
+
str
(
timestamp
)
request_data
=
request
.
form
[
'param_json'
]
_logger
.
info
(
'mawb_declare kw:
%
s'
%
request_data
)
request_data
=
request
.
get_json
()
_logger
.
info
(
'temu_create_order:
%
s'
%
request_data
)
result
=
{}
if
request_data
:
request_data
=
json
.
loads
(
request_data
)
request_data
[
'request_id'
]
=
res
[
'requestID'
]
master_waybill_no
=
request_data
[
'master_waybill_no'
]
if
request_data
.
get
(
'master_waybill_no'
)
else
''
mwb_info
=
request_data
[
'mwb_info'
]
if
request_data
.
get
(
'mwb_info'
)
else
''
if
not
master_waybill_no
:
res
[
'code'
]
=
1007
res
[
'msg'
]
=
'提单号必填'
if
not
mwb_info
:
res
[
'code'
]
=
1007
res
[
'msg'
]
=
'提单信息必传'
if
res
[
'code'
]
==
0
:
logging
.
info
(
'推入redis'
)
push_data
=
{
'type'
:
'mawb'
,
'result'
:
request_data
}
r_conn
.
lpush
(
'tiktok_parcel_data'
,
json
.
dumps
(
push_data
))
# return_res = rpc.customs_tiktok.mawb_declare(**request_data)
# if return_res:
# if return_res['err_msg']:
# res['code'] = 5000
# res['msg'] = return_res['err_msg']
# print(type(request_data))
data
=
request_data
result
[
'request_id'
]
=
res
[
'requestID'
]
result
[
'data'
]
=
data
provider_order_no
=
data
[
'orderNo'
]
if
data
.
get
(
'orderNo'
)
else
''
sequence
=
data
[
'sequence'
]
if
data
.
get
(
'sequence'
)
else
''
if
not
provider_order_no
:
res
[
'errorCode'
]
=
1007
res
[
'errorMsg'
]
=
'TEMU发货单号必填'
res
[
'success'
]
=
False
if
not
sequence
:
res
[
'errorCode'
]
=
1007
res
[
'errorMsg'
]
=
'版本号必填'
res
[
'success'
]
=
False
if
res
[
'errorCode'
]
==
0
:
# logging.info('推入redis')
# push_data = {'type': 'package', 'result': result}
# r_conn.lpush('tiktok_parcel_data', json.dumps(push_data))
# tiktok_package_declare.delay(**result)
return_res
=
rpc
.
temu_service
.
temu_create_order_service
(
**
result
)
if
return_res
:
if
return_res
[
'msg'
]:
res
[
'success'
]
=
False
res
[
'errorCode'
]
=
1008
res
[
'errorMsg'
]
=
return_res
[
'msg'
]
else
:
res
[
'result'
]
=
return_res
[
'result'
]
else
:
res
[
'
c
ode'
]
=
5000
res
[
'
m
sg'
]
=
'参数未传'
res
[
'
errorC
ode'
]
=
5000
res
[
'
errorM
sg'
]
=
'参数未传'
return
jsonify
(
res
)
@api.route
(
'/temu/query/order'
,
methods
=
[
'post'
])
# @check_customer
def
temu_query_order
():
# 接收提单的附件信息
"""查询箱贴"""
res
=
{
"code"
:
0
,
"msg"
:
"success"
,
"success"
:
True
,
"errorCode"
:
0
,
"errorMsg"
:
"success"
,
"requestID"
:
"202312251715021060522200417739B9"
,
"ts"
:
"2023-12-25 17:15:03"
"serverTimeMs"
:
int
(
time
.
time
()
*
1000
),
"result"
:
None
,
"sequence"
:
""
}
request_time
=
datetime
.
utcnow
()
timestamp
=
int
(
time
.
time
())
res
[
'ts'
]
=
request_time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
res
[
'requestID'
]
=
request_time
.
strftime
(
"
%
Y
%
m
%
d
%
H
%
M
%
S"
)
+
str
(
timestamp
)
request_data
=
request
.
form
[
'param_json'
]
_logger
.
info
(
'mawb_copy_upload kw:
%
s'
%
request_data
)
request_data
=
request
.
get_json
()
_logger
.
info
(
'temu_query_order kw:
%
s'
%
request_data
)
result
=
{}
if
request_data
:
request_data
=
json
.
loads
(
request_data
)
request_data
[
'request_id'
]
=
res
[
'requestID'
]
master_waybill_no
=
request_data
[
'master_waybill_no'
]
if
request_data
.
get
(
'master_waybill_no'
)
else
''
if
not
master_waybill_no
:
res
[
'code'
]
=
1007
res
[
'msg'
]
=
'提单号必填'
if
res
[
'code'
]
==
0
:
return_res
=
rpc
.
customs_tiktok
.
mawb_copy_upload
(
**
request_data
)
result
[
'request_id'
]
=
res
[
'requestID'
]
order_no
=
request_data
[
'orderNo'
]
result
[
'data'
]
=
request_data
logistics_order_no
=
request_data
[
'logisticsOrderNo'
]
version_no
=
request_data
[
'sequence'
]
carton_arr
=
request_data
[
'cartonInfo'
]
if
not
order_no
:
res
[
'errorCode'
]
=
1007
res
[
'errorMsg'
]
=
'TEMU发货单号必填'
res
[
'success'
]
=
False
if
not
version_no
:
res
[
'errorCode'
]
=
1007
res
[
'errorMsg'
]
=
'版本号必填'
res
[
'success'
]
=
False
if
res
[
'errorCode'
]
==
0
:
return_res
=
rpc
.
temu_service
.
temu_query_order_service
(
**
result
)
if
return_res
:
if
return_res
[
'msg'
]:
res
[
'code'
]
=
return_res
[
'code'
]
res
[
'msg'
]
=
return_res
[
'msg'
]
res
[
'errorCode'
]
=
1008
res
[
'errorMsg'
]
=
return_res
[
'msg'
]
res
[
'success'
]
=
False
else
:
res
[
'result'
]
=
return_res
[
'result'
]
res
[
'sequence'
]
=
return_res
[
'sequence'
]
else
:
res
[
'code'
]
=
5000
res
[
'msg'
]
=
'参数未传'
...
...
@@ -180,63 +209,31 @@ def temu_cancel_order():
timestamp
=
int
(
time
.
time
())
res
[
'ts'
]
=
request_time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
res
[
'requestID'
]
=
request_time
.
strftime
(
"
%
Y
%
m
%
d
%
H
%
M
%
S"
)
+
str
(
timestamp
)
request_data
=
request
.
form
[
'param_json'
]
request_data
=
request
.
get_json
()
_logger
.
info
(
'temu_cancel_order kw:
%
s'
%
request_data
)
result
=
{}
if
request_data
:
data
=
json
.
loads
(
request_data
)
result
[
'request_id'
]
=
res
[
'requestID'
]
result
[
'data'
]
=
data
provider_order_no
=
data
[
'orderNo'
]
if
data
.
get
(
'orderNo'
)
else
''
logistics_order_no
=
data
[
'logisticsOrderNo'
]
if
data
.
get
(
'logisticsOrderNo'
)
else
''
result
[
'data'
]
=
request_
data
provider_order_no
=
request_data
[
'orderNo'
]
if
request_
data
.
get
(
'orderNo'
)
else
''
logistics_order_no
=
request_data
[
'logisticsOrderNo'
]
if
request_
data
.
get
(
'logisticsOrderNo'
)
else
''
if
not
provider_order_no
:
res
[
'errorCode'
]
=
1007
res
[
'errorMsg'
]
=
'TEMU发货单号必填'
res
[
'success'
]
=
False
if
not
logistics_order_no
:
res
[
'errorCode'
]
=
1007
res
[
'errorMsg'
]
=
'服务商发货单号必填'
if
res
[
'code'
]
==
0
:
res
[
'success'
]
=
False
if
res
[
'errorCode'
]
==
0
:
return_res
=
rpc
.
temu_service
.
temu_cancel_order_service
(
**
result
)
if
return_res
:
if
return_res
[
'msg'
]:
res
[
'errorCode'
]
=
1008
res
[
'errorMsg'
]
=
return_res
[
'msg'
]
res
[
'success'
]
=
False
else
:
res
[
'code'
]
=
5000
res
[
'msg'
]
=
'参数未传'
return
jsonify
(
res
)
@api.route
(
'/logistics/provider/customs/mawb_cancel'
,
methods
=
[
'post'
])
# @check_customer
def
mawb_cancel
():
# 接收提单取消
res
=
{
"code"
:
0
,
"msg"
:
"success"
,
"requestID"
:
"202312251715021060522200417739B9"
,
"ts"
:
"2023-12-25 17:15:03"
}
request_time
=
datetime
.
utcnow
()
timestamp
=
int
(
time
.
time
())
res
[
'ts'
]
=
request_time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
res
[
'requestID'
]
=
request_time
.
strftime
(
"
%
Y
%
m
%
d
%
H
%
M
%
S"
)
+
str
(
timestamp
)
request_data
=
request
.
form
[
'param_json'
]
_logger
.
info
(
'mawb_cancel kw:
%
s'
%
request_data
)
if
request_data
:
request_data
=
json
.
loads
(
request_data
)
request_data
[
'request_id'
]
=
res
[
'requestID'
]
master_waybill_no
=
request_data
[
'master_waybill_no'
]
if
request_data
.
get
(
'master_waybill_no'
)
else
''
if
not
master_waybill_no
:
res
[
'code'
]
=
1007
res
[
'msg'
]
=
'提单号必传'
if
res
[
'code'
]
==
0
:
return_res
=
rpc
.
customs_tiktok
.
mawb_cancel
(
**
request_data
)
if
return_res
:
if
return_res
[
'msg'
]:
res
[
'code'
]
=
return_res
[
'code'
]
res
[
'msg'
]
=
return_res
[
'msg'
]
else
:
res
[
'code'
]
=
5000
res
[
'msg'
]
=
'参数未传'
return
jsonify
(
res
)
app/manage.py
浏览文件 @
351a9d6e
...
...
@@ -17,4 +17,4 @@ swagger_config['description'] = config.description
Swagger
(
app
)
CORS
(
app
,
supports_credentials
=
True
)
if
__name__
==
"__main__"
:
app
.
run
(
host
=
"0.0.0.0"
,
port
=
int
(
parse_args
.
port
),
debug
=
Tru
e
)
app
.
run
(
host
=
"0.0.0.0"
,
port
=
int
(
parse_args
.
port
),
debug
=
Fals
e
)
services/create_pdf.py
0 → 100644
浏览文件 @
351a9d6e
# !/usr/bin/python
# -*- coding:utf-8 -*-
import
ast
import
copy
import
json
import
sys
import
time
import
requests
import
logging
from
dependence.services.util
import
YhjCommon
,
Order_dispose
from
dependence.services
import
config
import
redis
logging
.
basicConfig
(
handlers
=
[
logging
.
FileHandler
(
'logs/create_pdf.log'
,
'a'
,
'utf-8'
)],
format
=
'
%(asctime)
s
%(levelname)
s
%(message)
s'
,
level
=
logging
.
INFO
)
_logger
=
logging
.
getLogger
(
__name__
)
# 初始化连接(建议放在外面,或者是单例)
odoo_conn
=
Order_dispose
()
class
CreateCartonPdf
(
object
):
def
start_worker
(
self
,
order_no
):
"""
启动守护进程循环
:param interval: 每次循环的休眠时间(秒)
"""
_logger
.
info
(
f
">>>订单{order_no} PDF 生成服务已启动 <<<"
)
try
:
# --- 执行业务逻辑 ---
# 调用 Odoo 端的方法 (假设 cron_create_pdf 内部会自动查找 draft 状态的记录并处理)
# 注意:这里保留了您原本的传参 ["self"],请确保 Odoo 端方法能接收此参数
result
=
odoo_conn
.
odoo_db
.
execute
(
"temu.order.carton"
,
"cron_create_pdf"
,
[
"self"
])
# 如果 Odoo 返回了处理的数量,可以打个日志
if
result
:
_logger
.
info
(
f
"本次执行结果: {result}"
)
except
Exception
as
err
:
# --- 异常处理 ---
# 捕获所有异常,确保脚本不会崩掉退出
_logger
.
error
(
f
"发生异常,将在 10秒 后重试。错误详情: {str(err)}"
)
time
.
sleep
(
1
)
# 出错后多睡一会儿,防止日志刷屏
try
:
pool
=
redis
.
ConnectionPool
(
**
config
.
redis_options
)
r
=
redis
.
Redis
(
connection_pool
=
pool
)
logging
.
info
(
u'redis连接成功'
)
pdf_obj
=
CreateCartonPdf
()
while
1
:
try
:
result
=
r
.
brpop
([
'create_pdf_data'
],
0
)
data1
=
result
[
1
]
task_data
=
json
.
loads
(
data1
)
if
result
:
pdf_obj
.
start_worker
(
task_data
.
get
(
'order_no'
))
else
:
logging
.
error
(
'未找到数据类型'
)
except
Exception
as
e
:
logging
.
error
(
'error:
%
s'
%
str
(
e
))
continue
except
Exception
as
e
:
logging
.
error
(
"登录失败:
%
s"
%
e
)
services/dependence/services/config.py
浏览文件 @
351a9d6e
...
...
@@ -27,7 +27,7 @@ db_password = "qq166349"
# odoorpc配置
rpc_db_ip
=
'127.0.0.1'
rpc_db_port
=
"8069"
rpc_db_name
=
"
hh_ccs_425
"
rpc_db_name
=
"
test_xqhwaybill
"
rpc_db_user
=
"admin"
rpc_db_password
=
"admin"
...
...
services/dependence/services/db_service.py
浏览文件 @
351a9d6e
...
...
@@ -43,8 +43,8 @@ class DbService(object):
username
=
config
.
db_user
,
password
=
config
.
db_password
,
host
=
config
.
db_ip
,
port
=
config
.
db_port
,
database
=
config
.
db_name
),
pool_size
=
15
,
max_overflow
=
5
,
pool_size
=
20
,
max_overflow
=
10
,
pool_recycle
=
3600
# 1小时回收连接,防止 MySQL/PG 断连
)
...
...
services/dependence/services/util.py
浏览文件 @
351a9d6e
...
...
@@ -205,9 +205,9 @@ class YhjCommon(object):
return
nTime
# class Order_dispose(object):
#
# def __init__(self):
# # rpc连接
# self.odoo_db = odoorpc.ODOO(config.rpc_db_ip, port=config.rpc_db_port)
# self.odoo_db.login(config.rpc_db_name, config.rpc_db_user, config.rpc_db_password)
\ No newline at end of file
class
Order_dispose
(
object
):
def
__init__
(
self
):
# rpc连接
self
.
odoo_db
=
odoorpc
.
ODOO
(
config
.
rpc_db_ip
,
port
=
config
.
rpc_db_port
)
self
.
odoo_db
.
login
(
config
.
rpc_db_name
,
config
.
rpc_db_user
,
config
.
rpc_db_password
)
services/logs/tiktok_custom.log
deleted
100644 → 0
浏览文件 @
e2472b49
services/logs/tiktok_queue.log
deleted
100644 → 0
浏览文件 @
e2472b49
This source diff could not be displayed because it is too large. You can
view the blob
instead.
services/supervisord_conf/conf.d/
tiktok_queue
.conf
→
services/supervisord_conf/conf.d/
create_pdf
.conf
浏览文件 @
351a9d6e
[
program
:
tiktok_queue
_1
]
[
program
:
create_pdf
_1
]
process_name
=%(
program_name
)
s_
%(
process_num
)
02
d
; 进程名称
directory
= /
mnt
/
extra
-
addons
; 程序的启动目录
command
= /
usr
/
local
/
bin
/
python3
/
mnt
/
extra
-
addons
/
tiktok_queue
.
py
; 启动命令
command
= /
usr
/
local
/
bin
/
python3
/
mnt
/
extra
-
addons
/
create_pdf
.
py
; 启动命令
autostart
=
true
; 在
supervisord
启动的时候也自动启动
startsecs
=
5
; 启动
5
秒后没有异常退出,就当作已经正常启动了
autorestart
=
true
; 程序异常退出后自动重启
startretries
=
3
; 启动失败自动重试次数,默认是
3
user
=
root
; 用哪个用户启动
numprocs
=
1
; 进程数
numprocs
=
1
; 进程数
redirect_stderr
=
true
; 把
stderr
重定向到
stdout
,默认
false
stdout_logfile_maxbytes
=
20
MB
;
stdout
日志文件大小,默认
50
MB
stdout_logfile_backups
=
20
;
stdout
日志文件备份数
;
stdout
日志文件,需要注意当指定目录不存在时无法正常启动,所以需要手动创建目录(
supervisord
会自动创建日志文件)
stdout_logfile
= /
var
/
log
/
supervisor
/
tiktok_queue
.
log
\ No newline at end of file
stdout_logfile
= /
var
/
log
/
supervisor
/
create_pdf
.
log
\ No newline at end of file
services/supervisord_conf/conf.d/t
iktok
_service.conf
→
services/supervisord_conf/conf.d/t
emu
_service.conf
浏览文件 @
351a9d6e
[
program
:
t
iktok
_service_1
]
[
program
:
t
emu
_service_1
]
process_name
=%(
program_name
)
s_
%(
process_num
)
02
d
; 进程名称
directory
= /
mnt
/
extra
-
addons
; 程序的启动目录
command
=
nameko
run
t
iktok
_service
--
config
foobar
.
yaml
; 启动命令
command
=
nameko
run
t
emu
_service
--
config
foobar
.
yaml
; 启动命令
autostart
=
true
; 在
supervisord
启动的时候也自动启动
startsecs
=
5
; 启动
5
秒后没有异常退出,就当作已经正常启动了
autorestart
=
true
; 程序异常退出后自动重启
...
...
@@ -12,5 +12,5 @@ redirect_stderr = true ; 把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes
=
20
MB
;
stdout
日志文件大小,默认
50
MB
stdout_logfile_backups
=
20
;
stdout
日志文件备份数
;
stdout
日志文件,需要注意当指定目录不存在时无法正常启动,所以需要手动创建目录(
supervisord
会自动创建日志文件)
stdout_logfile
= /
var
/
log
/
supervisor
/
t
iktok
_service
.
log
stdout_logfile
= /
var
/
log
/
supervisor
/
t
emu
_service
.
log
services/temu_service.py
浏览文件 @
351a9d6e
...
...
@@ -21,11 +21,14 @@ from random import Random
import
pandas
as
pd
from
nameko.rpc
import
rpc
,
RpcProxy
from
datetime
import
datetime
,
timedelta
,
timezone
from
psycopg2.extras
import
execute_values
# 批量插入的性能优化
from
dependence.services
import
config
from
dependence.services
import
db_handle
from
dependence.services.util
import
YhjCommon
from
dependence.services.redis_service
import
RedisClient
from
logging.handlers
import
RotatingFileHandler
import
zlib
import
base64
# from line_profiler import LineProfiler
# # 创建一个 LineProfiler 实例
# profiler = LineProfiler()
...
...
@@ -75,8 +78,7 @@ class TemuService(object):
cr
.
execute
(
"""
SELECT id, version_no, logistics_order_no, delivery_mode, deliver_method
FROM temu_order
WHERE temu_delivery_no =
%
s
FOR UPDATE
WHERE temu_delivery_no =
%
s
"""
,
(
order_no
,))
existing
=
cr
.
fetchone
()
order_id
=
None
...
...
@@ -84,14 +86,13 @@ class TemuService(object):
incoming_seq
=
int
(
kw
.
get
(
'sequence'
,
0
))
if
existing
:
order_id
,
current_seq
,
logistics_order_no
,
d_mode
,
d_method
=
existing
# === 场景 A: 版本过低 ===
current_seq
=
int
(
current_seq
)
if
incoming_seq
<
(
current_seq
or
0
):
return_res
[
'msg'
]
=
'下单失败:版本号低于当前系统版本'
return_res
[
'result'
]
=
{
'orderNo'
:
order_no
,
'logisticsOrderNo'
:
logistics_order_no
}
self
.
_log_api
(
cr
,
kw_data
,
order_no
,
return_res
[
'msg'
])
return
return_res
# === 场景 B: 版本一致 (直接返回) ===
elif
incoming_seq
==
(
current_seq
or
0
):
# 如果需要返回箱号,查一下
...
...
@@ -104,6 +105,7 @@ class TemuService(object):
'logisticsOrderNo'
:
logistics_order_no
,
'cartonInfo'
:
carton_res
}
self
.
_log_api
(
cr
,
kw_data
,
order_no
,
return_res
[
'msg'
])
return
return_res
# === 场景 C: 更新模式 ===
else
:
...
...
@@ -130,10 +132,15 @@ class TemuService(object):
c
.
get
(
'containerVolume'
),
c
.
get
(
'containerVolumeUnit'
)
)
for
c
in
containers
]
cr
.
executemany
(
"""
# cr.executemany("""
# INSERT INTO temu_order_container
# (order_id, container_type, container_weight, container_weight_unit, container_volume, container_volume_unit)
# VALUES (%s, %s, %s, %s, %s, %s)
# """, c_data)
execute_values
(
cr
,
"""
INSERT INTO temu_order_container
(order_id, container_type, container_weight, container_weight_unit, container_volume, container_volume_unit)
VALUES
(
%
s,
%
s,
%
s,
%
s,
%
s,
%
s)
VALUES
%
s
"""
,
c_data
)
# --- B. 处理大箱与SKU (Carton & SKU) ---
...
...
@@ -145,7 +152,7 @@ class TemuService(object):
for
c
in
cartons
:
# 收集大箱参数
carton_insert_values
.
append
((
order_id
,
c
[
'cartonNo'
],
c
.
get
(
'weight'
),
c
.
get
(
'weightUnit'
),
'draft'
,
order_id
,
c
[
'cartonNo'
],
c
.
get
(
'weight'
),
c
.
get
(
'weightUnit'
),
c
.
get
(
'length'
),
c
.
get
(
'width'
),
c
.
get
(
'height'
),
c
.
get
(
'lengthUnit'
),
c
.
get
(
'electrified'
)
))
...
...
@@ -157,74 +164,54 @@ class TemuService(object):
# ---------------------------------------------------------------
# 2. 批量插入大箱 (1次数据库交互)
# ---------------------------------------------------------------
response_cartons
=
[]
if
carton_insert_values
:
# 注意:executemany 不支持 RETURNING 返回结果集(在某些驱动下),
# 所以我们只负责插入,不在这里获取 ID
carton_gen_sql
=
"('CTN' || to_char(NOW(), 'YYYYMMDD') || lpad(nextval('seq_temu_carton_service')::text, 7, '0'))"
carton_sql
=
f
"""
insert_sql
=
"""
INSERT INTO temu_order_carton (
order_id, carton_no, service_
carton_no, weight, weight_unit, length, width, height,
length_unit, is_electrified, create_date
) VALUES (
%
s,
%
s, {carton_gen_sql},
%
s,
%
s,
%
s,
%
s,
%
s,
%
s,
%
s, NOW()
)
pdf_state, order_id,
carton_no, weight, weight_unit, length, width, height,
length_unit, is_electrified, create_date
,
service_carton_no
) VALUES
%
s
RETURNING carton_no, id, service_carton_no
"""
cr
.
executemany
(
carton_sql
,
carton_insert_values
)
template
=
"""
(
%
s,
%
s,
%
s,
%
s,
%
s,
%
s,
%
s,
%
s,
%
s,
%
s, NOW(),
(to_char(NOW(), 'YYMMDD') || lpad(nextval('seq_temu_carton_service')::text, 5, '0'))
)
"""
inserted_rows
=
execute_values
(
cr
,
insert_sql
,
carton_insert_values
,
template
=
template
,
fetch
=
True
,
page_size
=
1000
)
# ---------------------------------------------------------------
#
3. 批量回查 ID 并建立映射 (1次数据库交互)
#
4. 准备 SKU 的批量插入数据
# ---------------------------------------------------------------
# 核心技巧:利用 order_id 把刚才插入的所有箱子一次性查出来
# 假设 carton_no 在一个订单内是唯一的,我们可以用它做 Key
select_sql
=
"""
SELECT carton_no, id, service_carton_no
FROM temu_order_carton
WHERE order_id =
%
s
"""
cr
.
execute
(
select_sql
,
(
order_id
,))
# 建立映射字典: {'carton_no': db_id}
# 同时也收集需要返回给 API 的结果
carton_no_to_id_map
=
{}
response_cartons
=
[]
for
row
in
cr
.
fetchall
():
sku_insert_values
=
[]
# 遍历插入后的结果 (c_no, db_id, svc_c_no)
for
row
in
inserted_rows
:
c_no
,
db_id
,
svc_c_no
=
row
carton_no_to_id_map
[
c_no
]
=
db_id
# 收集返回给 Temu 的数据结构
# 构建 API 返回结构
response_cartons
.
append
({
'cartonNo'
:
c_no
,
'logisticsCartonNo'
:
svc_c_no
})
# ---------------------------------------------------------------
# 4. 准备 SKU 的批量插入数据
# ---------------------------------------------------------------
sku_insert_values
=
[]
for
c_no
,
sku_list
in
sku_temp_holder
.
items
():
# 通过映射找到数据库 ID
db_carton_id
=
carton_no_to_id_map
.
get
(
c_no
)
if
db_carton_id
:
for
sku
in
sku_list
:
# 匹配 SKU
if
c_no
in
sku_temp_holder
:
for
sku
in
sku_temp_holder
[
c_no
]:
sku_insert_values
.
append
((
order_id
,
db_carton_id
,
sku
[
'id'
],
sku
[
'quantity'
]
order_id
,
db_id
,
sku
[
'id'
],
sku
[
'quantity'
]
))
# ---------------------------------------------------------------
# 5. 批量插入 SKU (
1次数据库交互
)
# 5. 批量插入 SKU (
优化为 execute_values
)
# ---------------------------------------------------------------
if
sku_insert_values
:
sku_sql
=
"""
INSERT INTO temu_order_sku (order_id, carton_id, sku_id, quantity)
VALUES (
%
s,
%
s,
%
s,
%
s)
"""
cr
.
executemany
(
sku_sql
,
sku_insert_values
)
execute_values
(
cr
,
"""
INSERT INTO temu_order_sku (order_id, carton_id, sku_id, quantity)
VALUES
%
s
"""
,
sku_insert_values
,
page_size
=
1000
)
# -------------------------------------------------------
# 3. 构造返回
# -------------------------------------------------------
...
...
@@ -239,6 +226,7 @@ class TemuService(object):
'cartonInfo'
:
final_cartons
}
# 记录日志
redis_obj
.
lpush
(
'create_pdf_data'
,
json
.
dumps
({
'order_no'
:
order_no
}))
self
.
_log_api
(
cr
,
kw_data
,
order_no
,
""
)
except
Exception
as
e
:
...
...
@@ -246,13 +234,21 @@ class TemuService(object):
return_res
[
'msg'
]
=
f
"系统内部错误: {str(e)}"
return_res
[
'result'
]
=
{
'orderNo'
:
order_no
}
# 这里的 rollback 由 DbService 的 contextmanager 自动处理
# --- 新增部分:独立记录错误日志 ---
try
:
with
db_handle
.
get_cursor
()
as
log_cr
:
# 假设 _log_api 内部执行了 insert 语句
self
.
_log_api
(
log_cr
,
kw_data
,
order_no
,
return_res
[
'msg'
])
except
Exception
as
log_e
:
_logger
.
error
(
f
"写入错误日志失败: {str(log_e)}"
)
# profiler.print_stats()
return
return_res
# --- 辅助 SQL 方法 ---
def
_insert_order
(
self
,
cr
,
kw
):
""" 插入主表并返回 ID 和 生成的单号 """
logistics_no_sql
=
"(
'LGS' ||
to_char(NOW(), 'YYYYMMDD') || lpad(nextval('seq_temu_logistics_order')::text, 4, '0'))"
logistics_no_sql
=
"(to_char(NOW(), 'YYYYMMDD') || lpad(nextval('seq_temu_logistics_order')::text, 4, '0'))"
sql
=
f
"""
INSERT INTO temu_order (
...
...
@@ -303,7 +299,7 @@ class TemuService(object):
WHERE id=
%
s
"""
params
=
(
kw
.
get
(
'
orderNo'
),
kw
.
get
(
'
channelCode'
),
kw
.
get
(
'warehouse'
),
kw
.
get
(
'sequence'
),
kw
.
get
(
'deliverMethod'
),
kw
.
get
(
'channelCode'
),
kw
.
get
(
'warehouse'
),
kw
.
get
(
'sequence'
),
kw
.
get
(
'deliverMethod'
),
kw
.
get
(
'deliveryMode'
),
kw
.
get
(
'electrified'
),
kw
.
get
(
'predictCharge'
),
kw
.
get
(
'predictChargeCurrency'
),
kw
.
get
(
'exportLicenseSource'
),
kw
.
get
(
'companyInfo'
,
{})
.
get
(
'companyName'
),
kw
.
get
(
'companyInfo'
,
{})
.
get
(
'fullName'
),
kw
.
get
(
'companyInfo'
,
{})
.
get
(
'phone'
),
kw
.
get
(
'companyInfo'
,
{})
.
get
(
'email'
),
kw
.
get
(
'shippingInfo'
,
{})
.
get
(
'fullName'
),
kw
.
get
(
'shippingInfo'
,
{})
.
get
(
'regionName1'
),
kw
.
get
(
'shippingInfo'
,
{})
.
get
(
'regionName2'
),
kw
.
get
(
'shippingInfo'
,
{})
.
get
(
'regionName3'
),
kw
.
get
(
'shippingInfo'
,
{})
.
get
(
'regionName4'
),
...
...
@@ -316,13 +312,14 @@ class TemuService(object):
def
_log_api
(
self
,
cr
,
kw_data
,
order_no
,
error_msg
):
try
:
is_success
=
False
if
error_msg
else
True
cr
.
execute
(
"""
INSERT INTO temu_api_log (big_bag_no, error_msg, push_time, data_text, request_id, source)
VALUES (
%
s,
%
s, NOW(),
%
s,
%
s, '推入')
INSERT INTO temu_api_log (big_bag_no, error_msg, push_time, data_text, request_id, s
uccess_bl, s
ource)
VALUES (
%
s,
%
s, NOW(),
%
s,
%
s,
%
s,
'推入')
"""
,
(
order_no
,
error_msg
,
json
.
dumps
(
kw_data
.
get
(
'data'
,
{})),
kw_data
.
get
(
'request_id'
)
kw_data
.
get
(
'request_id'
)
,
is_success
))
except
Exception
:
pass
# 日志错误不影响主流程
...
...
@@ -337,744 +334,182 @@ class TemuService(object):
print
(
"Parsed datetime (UTC):"
,
dt_utc
)
return
dt_utc
# @rpc
# def temu_create_order_service(self, **kw_data):
# # 接收temu订单信息
# return_res = {
# "msg": '',
# "result": {}
# }
# order_no = ''
# kw = {}
# try:
# # 获取订单号
# # 查询订单是否存在系统 不存在则创建
# # 存在 则判断版本号 版本号低于订单的版本号返回失败 高于则更新订单
# kw = kw_data['data']
# order_no = kw['orderNo']
# # 传入的数据
# vals = {
# "temu_delivery_no": kw['orderNo'],
# "channel_code": kw['channelCode'],
# "warehouse_name": kw.get('warehouse'),
# "version_no": kw['sequence'],
# "deliver_method": kw['deliverMethod'],
# "delivery_mode": kw['deliveryMode'],
# "is_electrified": kw['electrified'],
# "predict_charge": kw.get('predictCharge'),
# "predict_charge_currency": kw.get('predictChargeCurrency'),
#
# "export_license_source": kw['exportLicenseSource'],
# # 公司信息
# "company_name": kw.get('companyInfo', {}).get('companyName', ''),
# "company_contact_name": kw.get('companyInfo', {}).get('fullName', ''),
# "company_phone": kw.get('companyInfo', {}).get('phone', ''),
# "company_email": kw.get('companyInfo', {}).get('email', ''),
# # 发货仓
# "ship_contact_name": kw.get('shippingInfo', {}).get('fullName', ''),
# "ship_region1": kw.get('shippingInfo', {}).get('regionName1', ''),
# "ship_region2": kw.get('shippingInfo', {}).get('regionName2', ''),
# "ship_region3": kw.get('shippingInfo', {}).get('regionName3', ''),
# "ship_region4": kw.get('shippingInfo', {}).get('regionName4', ''),
# "ship_detail_address": kw.get('shippingInfo', {}).get('detailedAddress', ''),
# "ship_email": kw.get('shippingInfo', {}).get('email', ''),
# "ship_phone": kw.get('shippingInfo', {}).get('phone', ''),
# "ship_postcode": kw.get('shippingInfo', {}).get('postcode', ''),
# # 目的仓
# "dest_contact_name": kw.get('destinationInfo', {}).get('fullName', ''),
# "dest_region1": kw.get('destinationInfo', {}).get('regionName1', ''),
# "dest_region2": kw.get('destinationInfo', {}).get('regionName2', ''),
# "dest_region3": kw.get('destinationInfo', {}).get('regionName3', ''),
# "dest_region4": kw.get('destinationInfo', {}).get('regionName4', ''),
# "dest_detail_address": kw.get('destinationInfo', {}).get('detailedAddress', ''),
# "dest_email": kw.get('destinationInfo', {}).get('email', ''),
# "dest_phone": kw.get('destinationInfo', {}).get('phone', ''),
# "dest_postcode": kw.get('destinationInfo', {}).get('postcode', ''),
# "dest_warehouse_no": kw.get('destinationInfo', {}).get('warehouseNo', ''),
# "carton_count": len(kw.get('containerInfo', []))
# }
# sql = "select id,logistics_order_no,version_no,delivery_mode,deliver_method from temu_order where temu_delivery_no=%s"
# temu_order_sql_result = pd.read_sql(sql, con=db_handle, params=(order_no, ))
# temu_order_id = ''
# temu_order_version_no = 0
# temu_logistics_order_no = ''
# temu_delivery_mode = ''
# temu_deliver_method = ''
# for temu_res in temu_order_sql_result.itertuples():
# temu_order_id = temu_res.id
# temu_order_version_no = temu_res.version_no
# temu_logistics_order_no = temu_res.logistics_order_no
# temu_delivery_mode = temu_res.delivery_mode
# temu_deliver_method = temu_res.deliver_method
# if temu_order_id: # 存在
# # 判断版本号
# result = {
# 'orderNo': order_no,
# 'logisticsOrderNo': temu_logistics_order_no,
# # 'cartonInfo': [{
# # "cartonNo": "",
# # "logisticsCartonNo": "",
# # }]
# }
# if vals['version_no'] < temu_order_version_no:
# return_res['msg'] = '下单失败:版本号低于当前系统版本'
# return_res['result'] = result
# elif vals['version_no'] == temu_order_version_no:
# carton_info_arr = []
# if temu_delivery_mode != '1' or temu_deliver_method != '3':
# sql = "select carton_no,service_carton_no from temu_order_carton where order_id=%s"
# order_carton_result = pd.read_sql(sql, con=db_handle, params=(temu_order_id, ))
# for carton_res in order_carton_result.itertuples():
# carton_info_arr.append({
# 'cartonNo': carton_res.carton_no,
# 'logisticsCartonNo': carton_res.service_carton_no,
# })
# result['cartonInfo'] = carton_info_arr
# return_res['result'] = result
# else:
# # 更新订单数据
# update_query = """
# UPDATE temu_order
# SET
# channel_code = %s,
# warehouse_name = %s,
# version_no = %s,
# deliver_method = %s,
# delivery_mode = %s,
# is_electrified = %s,
# predict_charge = %s,
# predict_charge_currency = %s,
# export_license_source = %s,
#
# -- 公司信息
# company_name = %s,
# company_contact_name = %s,
# company_phone = %s,
# company_email = %s,
#
# -- 发货仓信息
# ship_contact_name = %s,
# ship_region1 = %s,
# ship_region2 = %s,
# ship_region3 = %s,
# ship_region4 = %s,
# ship_detail_address = %s,
# ship_email = %s,
# ship_phone = %s,
# ship_postcode = %s,
#
# -- 目的仓信息
# dest_contact_name = %s,
# dest_region1 = %s,
# dest_region2 = %s,
# dest_region3 = %s,
# dest_region4 = %s,
# dest_detail_address = %s,
# dest_email = %s,
# dest_phone = %s,
# dest_postcode = %s,
# dest_warehouse_no = %s,
#
# WHERE id = %s
# """
# params = (
# vals.get('channel_code'),
# vals.get('warehouse_name'),
# vals.get('version_no'),
# vals.get('deliver_method'),
# vals.get('delivery_mode'),
# vals.get('is_electrified'),
# vals.get('predict_charge'),
# vals.get('predict_charge_currency'),
# vals.get('export_license_source'),
#
# # 公司信息
# vals.get('company_name'),
# vals.get('company_contact_name'),
# vals.get('company_phone'),
# vals.get('company_email'),
#
# # 发货仓
# vals.get('ship_contact_name'),
# vals.get('ship_region1'),
# vals.get('ship_region2'),
# vals.get('ship_region3'),
# vals.get('ship_region4'),
# vals.get('ship_detail_address'),
# vals.get('ship_email'),
# vals.get('ship_phone'),
# vals.get('ship_postcode'),
#
# # 目的仓
# vals.get('dest_contact_name'),
# vals.get('dest_region1'),
# vals.get('dest_region2'),
# vals.get('dest_region3'),
# vals.get('dest_region4'),
# vals.get('dest_detail_address'),
# vals.get('dest_email'),
# vals.get('dest_phone'),
# vals.get('dest_postcode'),
# vals.get('dest_warehouse_no'),
#
# temu_order_id
# )
# pd.read_sql(update_query, con=db_handle, params=params, chunksize=100) # 更新temu_order的sql语句
# delete_sku_sql = 'delete from temu_order_sku where order_id=%s' % temu_order_id
# pd.read_sql(delete_sku_sql, con=db_handle, chunksize=100)
# delete_container_sql = 'delete from temu_order_container where order_id=%s' % temu_order_id
# pd.read_sql(delete_container_sql, con=db_handle, chunksize=100)
# delete_carton_sql = 'delete from temu_order_carton where order_id=%s' % temu_order_id
# pd.read_sql(delete_carton_sql, con=db_handle, chunksize=100)
# # 创建柜
# container_arr = kw.get('containerInfo', [])
# create_container_arr = []
# for container_data in container_arr:
# create_container_arr.append({
# "order_id": temu_order_id,
# "container_type": container_data['containerType'],
# "container_weight": container_data['containerWeight'],
# "container_weight_unit": container_data['containerWeightUnit'],
# "container_volume": container_data['containerVolume'],
# "container_volume_unit": container_data['containerVolumeUnit'],
# })
# if create_container_arr:
# # 创建柜信息
# val_df = pd.DataFrame(create_container_arr)
# val_df.to_sql('temu_order_container', con=db_handle, if_exists='append', index=False)
# # 创建大箱
# create_carton_no_arr = []
# # values_str = ''
# carton_arr = kw.get('cartonInfo', [])
# create_carton_arr = []
# create_sku_arr = []
# for carton_data in carton_arr:
# carton_no = carton_data['cartonNo']
# create_carton_no_arr.append(carton_no)
# create_carton_arr.append({
# 'order_id': temu_order_id,
# 'carton_no': carton_no,
# 'weight': carton_data['weight'],
# 'weight_unit': carton_data['weightUnit'],
# 'length': carton_data['length'],
# 'width': carton_data['width'],
# 'height': carton_data['height'],
# 'length_unit': carton_data['lengthUnit'],
# 'is_electrified': carton_data['electrified'],
# })
# sku_arr = carton_data.get('skuInfo', [])
# for sku_data in sku_arr:
# create_sku_arr.append({
# 'carton_no': carton_no,
# 'sku_id': sku_data['id'],
# 'quantity': sku_data['quantity'],
# })
# return_carton_arr = []
# if create_carton_arr:
# carton_no_id_res = {}
# val_df = pd.DataFrame(create_carton_arr)
# val_df.to_sql('temu_order_carton', con=db_handle, if_exists='append', index=False)
# # 获取创建后 大箱单号对应的id,bl_id
# carton_no_str = '(%s)' % str(create_carton_no_arr)[1:-1]
# sql = 'select id,carton_no,service_carton_no,order_id from temu_order_carton where carton_no in %s' % carton_no_str
# new_order_carton_arr = pd.read_sql(sql, con=db_handle)
# for new_carton_data in new_order_carton_arr.itertuples():
# carton_no_id_res[new_carton_data.carton_no] = (new_carton_data.id, new_carton_data.order_id)
# return_carton_arr.append({
# 'cartonNo': new_carton_data.carton_no,
# 'logisticsCartonNo': new_carton_data.service_carton_no,
# })
# if create_sku_arr:
# for create_sku_vals in create_sku_arr:
# carton_no = create_sku_vals['carton_no']
# data_list = carton_no_id_res.get(carton_no, [])
# create_sku_vals['carton_id'] = data_list[0] if data_list else None
# create_sku_vals['order_id'] = data_list[1] if data_list else None
# val_df = pd.DataFrame(create_sku_arr)
# val_df.to_sql('temu_order_sku', con=db_handle, if_exists='append', index=False)
# if vals['deliver_method'] != '3' or vals['delivery_mode'] != '1':
# result['cartonInfo'] = return_carton_arr
# return_res['result'] = result
# else:
# # 创建
# val_df = pd.DataFrame(vals, index=[0])
# val_df.to_sql('temu_order', con=db_handle, if_exists='append', index=False)
# sql = "select id,logistics_order_no from temu_order where temu_delivery_no='%s';" % vals['temu_delivery_no']
# new_order = pd.read_sql(sql, con=db_handle)
# logging.info("new_order:%s" % len(new_order))
# result_id = new_order.to_dict()['id'][0]
# logistics_order_no = new_order.to_dict()['logistics_order_no'][0]
# result = {
# 'orderNo': order_no,
# 'logisticsOrderNo': logistics_order_no,
# # 'cartonInfo': [{
# # "cartonNo": "",
# # "logisticsCartonNo": "",
# # }]
# }
# # 创建柜
# container_arr = kw.get('containerInfo', [])
# create_container_arr = []
# for container_data in container_arr:
# create_container_arr.append({
# "order_id": result_id,
# "container_type": container_data['containerType'],
# "container_weight": container_data['containerWeight'],
# "container_weight_unit": container_data['containerWeightUnit'],
# "container_volume": container_data['containerVolume'],
# "container_volume_unit": container_data['containerVolumeUnit'],
#
# })
# if create_container_arr:
# # 创建柜信息
# val_df = pd.DataFrame(create_container_arr)
# val_df.to_sql('temu_order_container', con=db_handle, if_exists='append', index=False)
# # 创建大箱
# create_carton_no_arr = []
# # values_str = ''
# carton_arr = kw.get('cartonInfo', [])
# create_carton_arr = []
# create_sku_arr = []
# for carton_data in carton_arr:
# carton_no = carton_data['cartonNo']
# create_carton_no_arr.append(carton_no)
# create_carton_arr.append({
# 'order_id': result_id,
# 'carton_no': carton_no,
# 'weight': carton_data['weight'],
# 'weight_unit': carton_data['weightUnit'],
# 'length': carton_data['length'],
# 'width': carton_data['width'],
# 'height': carton_data['height'],
# 'length_unit': carton_data['lengthUnit'],
# 'is_electrified': carton_data['electrified'],
# })
# sku_arr = carton_data.get('skuInfo', [])
# for sku_data in sku_arr:
# create_sku_arr.append({
# 'carton_no': carton_no,
# 'sku_id': sku_data['id'],
# 'quantity': sku_data['quantity'],
# })
# return_carton_arr = []
# if create_carton_arr:
# carton_no_id_res = {}
# val_df = pd.DataFrame(create_carton_arr)
# val_df.to_sql('temu_order_carton', con=db_handle, if_exists='append', index=False)
# # 获取创建后 大箱单号对应的id,bl_id
# carton_no_str = '(%s)' % str(create_carton_no_arr)[1:-1]
# sql = 'select id,carton_no,service_carton_no,order_id from temu_order_carton where carton_no in %s' % carton_no_str
# new_order_carton_arr = pd.read_sql(sql, con=db_handle)
# for new_carton_data in new_order_carton_arr.itertuples():
# carton_no_id_res[new_carton_data.carton_no] = (new_carton_data.id, new_carton_data.order_id)
# return_carton_arr.append({
# 'cartonNo': new_carton_data.carton_no,
# 'logisticsCartonNo': new_carton_data.service_carton_no,
# })
# if create_sku_arr:
# for create_sku_vals in create_sku_arr:
# carton_no = create_sku_vals['carton_no']
# data_list = carton_no_id_res.get(carton_no, [])
# create_sku_vals['carton_id'] = data_list[0] if data_list else None
# create_sku_vals['order_id'] = data_list[1] if data_list else None
# val_df = pd.DataFrame(create_sku_arr)
# val_df.to_sql('temu_order_sku', con=db_handle, if_exists='append', index=False)
# if vals['deliver_method'] != '3' or vals['delivery_mode'] != '1':
# result['cartonInfo'] = return_carton_arr
# return_res['result'] = result
# except Exception as err:
# return_res['msg'] = str(err)
# result = {'orderNo': order_no}
# return_res['result'] = result
# _logger.error('temu_create_order_service error:%s' %
# str(err))
# log_val = {
# 'big_bag_no': order_no,
# 'error_msg': return_res['msg'],
# 'push_time': datetime.utcnow(),
# 'data_text': json.dumps(kw),
# 'success_bl': return_res['msg'],
# 'request_id': kw_data['request_id'],
# 'source': '推入',
# }
# val_df = pd.DataFrame(log_val, index=[0])
# val_df.to_sql('temu_api_log', con=db_handle, if_exists='append', index=False)
# return return_res
# @rpc
# def temu_create_order_service(self, **kw):
# # 接收清关包裹信息
# msg = ''
# return_res = {
# "all_result": True,
# "failed_provider_order_ids": [],
# "err_msg": {},
# }
# logistic_order_no = ''
# data_text = ''
# utc_time = ''
# try:
# push_interval_15_days = 15
# tt_customer_id = None
# # 查询过滤最近天数 查询默认客户
# sql = "select key,value from ir_config_parameter where key='tk_push_interval_15_days' or key='tt_customer_id'"
# result_arr = pd.read_sql(sql, con=db_handle)
# for res in result_arr.itertuples():
# if res.key == 'tk_push_interval_15_days':
# push_interval_15_days = int(res.value)
# elif res.key == 'tt_customer_id':
# tt_customer_id = int(res.value)
# # 查询默认小包状态
# node_id = None
# sql = "select id from cc_node where node_type='package' and is_default=True limit 1"
# node_arr = pd.read_sql(sql, con=db_handle)
# for node_res in node_arr.itertuples():
# node_id = int(node_res.id)
# parcel_arr = kw['packages']
# date = datetime.now() - timedelta(days=int(push_interval_15_days))
# create_package_arr = [] # 创建小包vals数据
# delete_goods_ids = [] # 删除的小包商品
# package_no_id_res = {} # 维护小包号和id的对应
# create_package_no_arr = [] # 创建的小包号列表
# package_goods_vals_arr = [] # 创建的小包商品vals数据
# data_text = json.dumps(parcel_arr)
# utc_time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
# for package in parcel_arr:
# try:
# logistic_order_no = package.get('provider_order_id')
# sql = "select id,bl_id,is_cancel from cc_ship_package where logistic_order_no=%s and create_date >= %s"
# parcel_sql_result = pd.read_sql(sql, con=db_handle, params=(logistic_order_no, date))
# parcel_id = ''
# parcel_bl_id = ''
# for parcel_res in parcel_sql_result.itertuples():
# parcel_id = parcel_res.id
# parcel_bl_id = parcel_res.bl_id
# parcel_is_cancel = parcel_res.is_cancel
# if (parcel_id and (not parcel_bl_id or
# parcel_is_cancel)) or not parcel_id:
# ship_package_vals = dict(create_date=utc_time,state=node_id,
# is_cancel=False, cancel_reason='',
# logistic_order_no=package.get(
# 'provider_order_id'),
# tracking_no=package.get(
# 'tracking_no'),
# customer_ref=package.get(
# 'declaretion_bill_id'),
# internal_account_number="",
# user_track_note=package.get(
# 'remark'),
# company_code=package.get(
# 'entity_code'),
# trade_no=package.get(
# 'order_no'),
# # 需要将时间搓转换为时间
# operation_time=datetime.fromtimestamp(
# int(package.get('operate_time')) / 1000).strftime(
# '%Y-%m-%d %H:%M:%S'),
# big_package_no=package.get(
# 'big_bag_no'),
# container_no=package.get(
# 'container_no'),
# buyer_region=package.get(
# 'buyer_region'),
# next_provider_name=package.get(
# 'next_provider_name'),
# sender_name=package.get(
# 'sender_info').get('name'),
# sender_vat_no=package.get(
# 'sender_info').get('shipping_tax_id'),
# sender_phone=package.get(
# 'sender_info').get('phone'),
# sender_country=package.get('sender_info').get('address').get(
# 'address_l0'),
# sender_state=package.get('sender_info').get('address').get(
# 'address_l1'),
# sender_city=package.get('sender_info').get(
# 'address').get('address_l2'),
# sender_add_1=package.get('sender_info').get('address').get(
# 'address_l3'),
# sender_add_2=package.get('sender_info').get('address').get(
# 'address_l4'),
# sender_add_3=package.get('sender_info').get(
# 'address').get('details'),
# sender_postcode=package.get(
# 'sender_info').get('postcode'),
# receiver_name=package.get(
# 'receiver_info').get('name'),
# receiver_phone=package.get(
# 'receiver_info').get('phone'),
# receiver_postcode=package.get(
# 'receiver_info').get('postcode'),
# receiver_add_1=package.get('receiver_info').get('address').get(
# 'address_l0'),
# receiver_add_2=package.get('receiver_info').get('address').get(
# 'address_l1'),
# receiver_add_3=package.get('receiver_info').get('address').get(
# 'address_l2'),
# receiver_city=package.get('receiver_info').get('address').get(
# 'address_l3'),
# receiver_county=package.get('receiver_info').get('address').get(
# 'address_l4'),
# receiver_detailed_address=package.get(
# 'receiver_info').get('address').get('details'),
# receiver_vat_no=package.get(
# 'receiver_info').get('tax_id'),
# currency=package.get(
# 'currency'),
# gross_weight=package.get(
# 'package').get('real_weight'),
# weight_unit=package.get(
# 'package').get('weight_unit'),
# total_value=package.get(
# 'value').get('goods_value'),
# customer_id=tt_customer_id, is_sync=True) # 增加客户信息
# if package.get('items') and len(package.get('items')) > 0:
#
# good_id_arr = []
# for item in package.get('items'):
# item_id = item.get('item_id')
# if item_id not in good_id_arr:
# good_id_arr.append(item_id)
# package_good = dict(create_date=utc_time,bl_line_id=logistic_order_no,
# bl_id=False,
# item_id=item_id,
# sku_id=item.get(
# 'sku_id'),
# item_name_cn=item.get(
# 'product_name_cn'),
# item_name_en=item.get(
# 'product_name'),
# export_hs_code=item.get(
# 'export_hscode'),
# import_hs_code=item.get(
# 'import_hscode'),
# weight=item.get(
# 'weight') or None,
# quantity=item.get(
# 'qty'),
# quantity_unit=item.get(
# 'unit'),
# declare_price=item.get(
# 'unit_price') or None,
# freight=item.get(
# 'shipping_fee') or None,
# cod_amount=item.get(
# 'cod_fee') or None,
# vat_rate=item.get(
# 'vat_rate') or None,
# item_vat=item.get(
# 'vat_fee') or None,
# origin_country=item.get(
# 'origin_country'),
# item_type=item.get(
# 'item_type') or None,
# item_total_price=item.get(
# 'unit_price') or None,
# item_link=item.get(
# 'item_url'),
# item_tax_status=item.get('tax_mark') or None,)
# # _logger.info('package_good:%s' % package_good)
# if package_good:
# package_goods_vals_arr.append(package_good)
# if not parcel_id:
# # 创建小包
# create_package_no_arr.append(logistic_order_no)
# create_package_arr.append(ship_package_vals)
# else:
# # 删除小包商品
# package_no_id_res[logistic_order_no] = (parcel_id, parcel_bl_id) # 小包号对应id
# sql = 'select id from cc_package_good where bl_line_id = %s'
# delete_good_result = pd.read_sql(sql, con=db_handle, params=(parcel_id,))
# origin_goods_ids = delete_good_result['id'].tolist()
# delete_goods_ids += origin_goods_ids
# # 更新小包
# update_sql = """
# UPDATE cc_ship_package
# SET
# is_cancel = %s,
# cancel_reason = %s,
# logistic_order_no = %s,
# tracking_no = %s,
# customer_ref = %s,
# internal_account_number = %s,
# user_track_note = %s,
# company_code = %s,
# trade_no = %s,
# operation_time = %s,
# big_package_no = %s,
# container_no = %s,
# buyer_region = %s,
# next_provider_name = %s,
# sender_name = %s,
# sender_vat_no = %s,
# sender_phone = %s,
# sender_country = %s,
# sender_state = %s,
# sender_city = %s,
# sender_add_1 = %s,
# sender_add_2 = %s,
# sender_add_3 = %s,
# sender_postcode = %s,
# receiver_name = %s,
# receiver_phone = %s,
# receiver_postcode = %s,
# receiver_add_1 = %s,
# receiver_add_2 = %s,
# receiver_add_3 = %s,
# receiver_city = %s,
# receiver_county = %s,
# receiver_detailed_address = %s,
# receiver_vat_no = %s,
# currency = %s,
# gross_weight = %s,
# weight_unit = %s,
# total_value = %s,
# customer_id = %s
# WHERE id = %s;
#
# """
# # 构建参数列表
# params = (
# False, # is_cancel
# '', # cancel_reason
# ship_package_vals['logistic_order_no'],
# ship_package_vals['tracking_no'],
# ship_package_vals['customer_ref'],
# '', # internal_account_number
# ship_package_vals['user_track_note'],
# ship_package_vals['company_code'],
# ship_package_vals['trade_no'],
# ship_package_vals['operation_time'],
# ship_package_vals['big_package_no'],
# ship_package_vals['container_no'],
# ship_package_vals['buyer_region'],
# ship_package_vals['next_provider_name'],
# ship_package_vals['sender_name'],
# ship_package_vals['sender_vat_no'],
# ship_package_vals['sender_phone'],
# ship_package_vals['sender_country'],
# ship_package_vals['sender_state'],
# ship_package_vals['sender_city'],
# ship_package_vals['sender_add_1'],
# ship_package_vals['sender_add_2'],
# ship_package_vals['sender_add_3'],
# ship_package_vals['sender_postcode'],
# ship_package_vals['receiver_name'],
# ship_package_vals['receiver_phone'],
# ship_package_vals['receiver_postcode'],
# ship_package_vals['receiver_add_1'],
# ship_package_vals['receiver_add_2'],
# ship_package_vals['receiver_add_3'],
# ship_package_vals['receiver_city'],
# ship_package_vals['receiver_county'],
# ship_package_vals['receiver_detailed_address'],
# ship_package_vals['receiver_vat_no'],
# ship_package_vals['currency'],
# ship_package_vals['gross_weight'],
# ship_package_vals['weight_unit'],
# ship_package_vals['total_value'],
# tt_customer_id,
# parcel_id # id
# )
# pd.read_sql(update_sql, con=db_handle, params=params, chunksize=100)
# except Exception as err:
# return_res['all_result'] = False
# return_res['failed_provider_order_ids'].append(
# package.get('provider_order_id'))
# return_res['err_msg'].update(
# {package.get('provider_order_id'): str(err)})
# msg = str(err)
# _logger.error('package_declare error:%s' %
# str(err))
# # 创建小包
# # print('create_package_arr', create_package_arr)
# if create_package_arr:
# val_df = pd.DataFrame(create_package_arr)
# val_df.to_sql('cc_ship_package', con=db_handle, if_exists='append', index=False)
# # 获取创建后 小包单号对应的id,bl_id
# package_no_str = '(%s)' % str(create_package_no_arr)[1:-1]
# sql = 'select id,logistic_order_no,bl_id from cc_ship_package where logistic_order_no in %s' % package_no_str
# new_order_arr = pd.read_sql(sql, con=db_handle)
# for new_order_data in new_order_arr.itertuples():
# package_no_id_res[new_order_data.logistic_order_no] = (new_order_data.id, new_order_data.bl_id)
# if delete_goods_ids: # 删除小包商品
# ids = '(%s)' % str(delete_goods_ids)[1:-1]
# delete_goods_sql = 'delete from cc_package_good where id in %s' % ids
# pd.read_sql(delete_goods_sql, con=db_handle, chunksize=100)
# if package_goods_vals_arr:
# for package_goods_vals in package_goods_vals_arr:
# logistic_order_no = package_goods_vals['bl_line_id']
# data_list = package_no_id_res.get(logistic_order_no, [])
# package_goods_vals['bl_line_id'] = data_list[0] if data_list else None
# package_goods_vals['bl_id'] = data_list[1] if data_list else None
# val_df = pd.DataFrame(package_goods_vals_arr)
# val_df.to_sql('cc_package_good', con=db_handle, if_exists='append', index=False)
# except Exception as err:
# return_res['all_result'] = False
# return_res['failed_provider_order_ids'].append(
# logistic_order_no)
# return_res['err_msg'].update(
# {logistic_order_no: str(err)})
# _logger.error('package_declare error:%s' %
# str(err))
# msg = str(err)
# val = {
# 'big_bag_no': logistic_order_no,
# 'error_msg': msg,
# 'push_time': utc_time,
# 'data_text': data_text,
# 'success_bl': return_res['all_result'],
# 'request_id': kw['request_id'],
# 'source': '推入',
# 'create_date': datetime.utcnow()
# }
# val_df = pd.DataFrame(val, index=[0])
# val_df.to_sql('ao_tt_api_log', con=db_handle, if_exists='append', index=False)
# return return_res
@rpc
def
temu_cancel_order_service
(
self
,
**
kws
):
# 接收取消订单
return_res
=
{
"msg"
:
''
}
kw
=
kws
.
get
(
'data'
,
{})
order_no
=
kw
.
get
(
'orderNo'
,
''
)
logistics_order_no
=
kw
.
get
(
'logisticsOrderNo'
,
''
)
utc_time
=
datetime
.
utcnow
()
try
:
# 使用上下文管理器:自动开启事务、提交或回滚
with
db_handle
.
get_cursor
()
as
cr
:
# -----------------------------------------------------------
# 1. 查询订单 (原生 SQL 替代 pd.read_sql,性能提升巨大)
# -----------------------------------------------------------
# 加上 FOR UPDATE 锁住该行,防止同时发货和取消导致的冲突
sql
=
"""
SELECT id
FROM temu_order
WHERE temu_delivery_no =
%
s AND logistics_order_no =
%
s
"""
cr
.
execute
(
sql
,
(
order_no
,
logistics_order_no
))
row
=
cr
.
fetchone
()
if
row
:
temu_order_id
=
row
[
0
]
update_cancel_sql
=
"""
UPDATE temu_order
SET state='cancel', cancel_date=
%
s, write_date=NOW()
WHERE id=
%
s
"""
cr
.
execute
(
update_cancel_sql
,
(
utc_time
,
temu_order_id
))
else
:
return_res
[
'msg'
]
=
'系统未查询到订单'
log_sql
=
"""
INSERT INTO temu_api_log (
big_bag_no, error_msg, push_time, data_text,
success_bl, request_id, source, create_date
) VALUES (
%
s,
%
s,
%
s,
%
s,
%
s,
%
s, '推入', NOW())
"""
is_success
=
False
if
return_res
[
'msg'
]
else
True
log_params
=
(
f
'Temu订单取消: {order_no}'
,
return_res
[
'msg'
],
utc_time
,
# push_time
json
.
dumps
(
kw
),
# data_text
is_success
,
# success_bl
kws
.
get
(
'request_id'
)
# request_id
)
cr
.
execute
(
log_sql
,
log_params
)
except
Exception
as
err
:
# 上下文管理器会自动 rollback,这里只需处理返回值
return_res
[
'code'
]
=
500
return_res
[
'msg'
]
=
str
(
err
)
_logger
.
exception
(
f
'temu_cancel_order_service error: {str(err)}'
)
# 【核心修改】开启一个新的事务来记录错误日志
try
:
with
db_handle
.
get_cursor
()
as
log_cr
:
log_params
=
(
f
'Temu订单取消: {order_no}'
,
f
"系统异常: {str(err)}"
,
# 记录具体的报错信息
utc_time
,
json
.
dumps
(
kw
),
False
,
kws
.
get
(
'request_id'
)
)
log_cr
.
execute
(
log_sql
,
log_params
)
# with 结束时会自动 commit 这个新的事务
except
Exception
as
log_err
:
# 防止记录日志本身报错导致覆盖原来的错误
_logger
.
error
(
f
"写入错误日志失败: {str(log_err)}"
)
return
return_res
@rpc
def
temu_query_order_service
(
self
,
**
kws
):
# 查询箱贴
return_res
=
{
"msg"
:
''
"msg"
:
''
,
'result'
:
{},
"sequence"
:
''
}
order_no
=
''
kw
=
{}
utc_time
=
''
try
:
print
(
kws
)
kw
=
kws
[
'data'
]
order_no
=
kw
[
'orderNo'
]
logistics_order_no
=
kw
[
'logisticsOrderNo'
]
sql
=
"select id from temu_order where temu_delivery_no=
%
s and logistics_order_no=
%
s"
temu_order_id
=
''
temu_order_sql_result
=
pd
.
read_sql
(
sql
,
con
=
db_handle
,
params
=
(
order_no
,
logistics_order_no
))
utc_time
=
datetime
.
utcnow
()
for
temu_res
in
temu_order_sql_result
.
itertuples
():
temu_order_id
=
temu_res
.
id
if
temu_order_id
:
update_cancel_sql
=
"update temu_order set state='cancel',cancel_date=
%
s where id=
%
s"
pd
.
read_sql
(
update_cancel_sql
,
con
=
db_handle
,
params
=
(
temu_order_id
,
utc_time
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
)),
chunksize
=
100
)
else
:
return_res
[
'msg'
]
=
'系统未查询到订单'
version_no
=
kw
[
'sequence'
]
carton_arr
=
kw
[
'cartonInfo'
]
# 使用上下文管理器:自动开启事务、提交或回滚
with
db_handle
.
get_cursor
()
as
cr
:
sql
=
"select id from temu_order where temu_delivery_no=
%
s and logistics_order_no=
%
s and version_no=
%
s"
cr
.
execute
(
sql
,
(
order_no
,
logistics_order_no
,
str
(
version_no
)))
row
=
cr
.
fetchone
()
if
row
:
return_res
[
'sequence'
]
=
version_no
result
=
{
"orderNo"
:
order_no
,
"logisticsOrderNo"
:
logistics_order_no
,
"cartonInfo"
:
[{
"cartonNo"
:
""
,
"logisticsCartonNo"
:
""
,
"base64"
:
""
}],
}
temu_order_id
=
row
[
0
]
carton_info_arr
=
[]
for
carton_data
in
carton_arr
:
carton_no
=
carton_data
[
'cartonNo'
]
logistics_carton_no
=
carton_data
[
'logisticsCartonNo'
]
sql
=
"select id,carton_no,service_carton_no,label_file_base64 from temu_order_carton where carton_no=
%
s and service_carton_no=
%
s and order_id=
%
s"
cr
.
execute
(
sql
,
(
carton_no
,
logistics_carton_no
,
temu_order_id
))
carton_row
=
cr
.
fetchone
()
if
carton_row
:
c_no
=
carton_row
[
1
]
s_no
=
carton_row
[
2
]
compressed_str
=
carton_row
[
3
]
# 这是数据库里存的压缩后的字符串
final_base64
=
""
# === 解压核心逻辑 ===
if
compressed_str
:
try
:
# 步骤 A: 将数据库里的文本 Base64 解码 -> 得到压缩后的二进制数据
compressed_bytes
=
base64
.
b64decode
(
compressed_str
)
# 步骤 B: 使用 zlib 解压 -> 得到原始 PDF 二进制数据
original_pdf_bytes
=
zlib
.
decompress
(
compressed_bytes
)
# 步骤 C: 将原始 PDF 数据转回标准的 Base64 字符串 (供 API 返回)
final_base64
=
base64
.
b64encode
(
original_pdf_bytes
)
.
decode
(
'utf-8'
)
except
Exception
as
e
:
_logger
.
error
(
f
"箱贴 {c_no} 解压失败: {str(e)}"
)
final_base64
=
""
# 解压失败返回空或做其他处理
carton_info_arr
.
append
({
"cartonNo"
:
c_no
,
"logisticsCartonNo"
:
s_no
,
"base64"
:
final_base64
})
# 将处理好的数组放回结果
result
[
'cartonInfo'
]
=
carton_info_arr
return_res
[
'result'
]
=
result
else
:
return_res
[
'msg'
]
=
'系统未查询到订单'
log_sql
=
"""
INSERT INTO temu_api_log (
big_bag_no, error_msg, push_time, data_text,
success_bl, request_id, source, create_date
) VALUES (
%
s,
%
s, NOW(),
%
s,
%
s,
%
s, '推入', NOW())
"""
is_success
=
False
if
return_res
[
'msg'
]
else
True
log_params
=
(
f
'Temu订单查询箱贴: {order_no}'
,
return_res
[
'msg'
],
json
.
dumps
(
kw
),
# data_text
is_success
,
# success_bl
kws
.
get
(
'request_id'
)
# request_id
)
cr
.
execute
(
log_sql
,
log_params
)
except
Exception
as
err
:
return_res
[
'code'
]
=
500
return_res
[
'msg'
]
=
str
(
err
)
_logger
.
error
(
'
package_cancel
error:
%
s'
%
_logger
.
error
(
'
temu_query_order_service
error:
%
s'
%
str
(
err
))
val
=
{
'big_bag_no'
:
'Temu订单取消:
%
s'
%
order_no
,
'error_msg'
:
return_res
[
'msg'
],
'push_time'
:
utc_time
,
'data_text'
:
json
.
dumps
(
kw
),
'success_bl'
:
False
if
return_res
[
'msg'
]
else
True
,
'request_id'
:
kws
[
'request_id'
],
'source'
:
'推入'
,
'create_date'
:
datetime
.
utcnow
()
}
val_df
=
pd
.
DataFrame
(
val
,
index
=
[
0
])
val_df
.
to_sql
(
'ao_tt_api_log'
,
con
=
db_handle
,
if_exists
=
'append'
,
index
=
False
)
try
:
with
db_handle
.
get_cursor
()
as
log_cr
:
log_params
=
(
f
'Temu订单查询箱贴: {order_no}'
,
f
"系统异常: {str(err)}"
,
# 记录详细错误
json
.
dumps
(
kw
),
False
,
# success_bl = False
kws
.
get
(
'request_id'
)
)
log_cr
.
execute
(
log_sql
,
log_params
)
# 上下文结束自动 commit
except
Exception
as
log_e
:
_logger
.
error
(
f
"写入错误日志失败: {str(log_e)}"
)
return
return_res
services/tiktok_queue.py
deleted
100644 → 0
浏览文件 @
e2472b49
# !/usr/bin/python
# -*- coding:utf-8 -*-
# Author: Zhichang Fu
# Created Time: 2019-03-26 23:11:14
'''
BEGIN
function:
Hello Service
END
'''
import
ast
import
copy
import
json
import
sys
import
time
import
requests
import
logging
import
hashlib
import
uuid
import
redis
from
random
import
Random
import
pandas
as
pd
from
nameko.rpc
import
rpc
,
RpcProxy
from
datetime
import
datetime
,
timedelta
,
timezone
from
dependence.services
import
config
from
dependence.services
import
DbService
from
dependence.services.util
import
YhjCommon
,
Order_dispose
from
dependence.services.redis_service
import
RedisClient
from
logging.handlers
import
RotatingFileHandler
# from line_profiler import LineProfiler
# # 创建一个 LineProfiler 实例
# profiler = LineProfiler()
# sys.path.append("..")
logging
.
basicConfig
(
level
=
logging
.
INFO
)
# 调试debug级
# 创建日志记录器,指明日志保存的路径、每个日志文件的最大大小、保存的日志文件个数上限
file_log_handler
=
RotatingFileHandler
(
"logs/tiktok_queue.log"
,
maxBytes
=
1024
*
1024
*
100
,
backupCount
=
15
,
encoding
=
'utf-8'
)
# 创建日志记录的格式 日志等级 输入日志信息的文件名 行数 日志信息
formatter
=
logging
.
Formatter
(
'
%(asctime)
s
%(levelname)
s:
%(message)
s [in
%(pathname)
s:
%(lineno)
d]'
)
# 为刚创建的日志记录器设置日志记录格式
file_log_handler
.
setFormatter
(
formatter
)
# 为全局的日志工具对象(flask app使用的)添加日志记录器
logging
.
getLogger
()
.
addHandler
(
file_log_handler
)
# logging.basicConfig(handlers=[logging.FileHandler('logs/tiktok_test.py.log', 'a', 'utf-8')],
# format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
_logger
=
logging
.
getLogger
(
__name__
)
# 只会运行的时候连接一次
db_handle
=
DbService
()
.
conn_engine
# odoo_conn = Order_dispose()
# common = YhjCommon()
# redis_client = RedisClient()
# redis_obj = redis_client.get_redis(config.REDIS_NAME, config.REDIS_HOST, config.REDIS_PORT, config.REDIS_DB)
import
concurrent.futures
executor
=
concurrent
.
futures
.
ThreadPoolExecutor
(
max_workers
=
10
)
def
retry_call_with_args
(
func
,
max_retries
=
3
,
retry_delay
=
1
,
**
kwargs
):
for
attempt
in
range
(
max_retries
):
try
:
res
=
func
(
**
kwargs
)
if
not
res
.
get
(
'err_msg'
):
return
else
:
logging
.
warning
(
f
"{func.__name__} 第 {attempt+1} 次执行失败,尝试重试"
)
except
Exception
as
e
:
logging
.
error
(
f
"{func.__name__} 第 {attempt+1} 次抛出异常: {e}"
)
if
attempt
<
max_retries
-
1
:
time
.
sleep
(
retry_delay
)
logging
.
error
(
f
"{func.__name__} 最多重试 {max_retries} 次失败,放弃,数据:{kwargs}"
)
class
Order_dispose
(
object
):
def
get_rf_to_time
(
self
,
rfc3339_string
):
# 将RFC 3339时间字符串解析为datetime对象
dt
=
datetime
.
fromisoformat
(
rfc3339_string
.
replace
(
'Z'
,
'+00:00'
))
# 如果需要将其转换为UTC时区,可以显式设置时区
dt_utc
=
dt
.
replace
(
tzinfo
=
timezone
.
utc
)
# 输出结果
dt_utc
=
dt_utc
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
print
(
"Parsed datetime (UTC):"
,
dt_utc
)
return
dt_utc
def
tiktok_package_declare
(
self
,
**
kw
):
# 接收清关包裹信息
msg
=
''
return_res
=
{
"all_result"
:
True
,
"failed_provider_order_ids"
:
[],
"err_msg"
:
{},
}
logistic_order_no
=
''
data_text
=
''
utc_time
=
''
try
:
push_interval_15_days
=
15
tt_customer_id
=
None
# 查询过滤最近天数 查询默认客户
sql
=
"select key,value from ir_config_parameter where key='tk_push_interval_15_days' or key='tt_customer_id'"
result_arr
=
pd
.
read_sql
(
sql
,
con
=
db_handle
)
for
res
in
result_arr
.
itertuples
():
if
res
.
key
==
'tk_push_interval_15_days'
:
push_interval_15_days
=
int
(
res
.
value
)
elif
res
.
key
==
'tt_customer_id'
:
tt_customer_id
=
int
(
res
.
value
)
# 查询默认小包状态
node_id
=
None
sql
=
"select id from cc_node where node_type='package' and is_default=True limit 1"
node_arr
=
pd
.
read_sql
(
sql
,
con
=
db_handle
)
for
node_res
in
node_arr
.
itertuples
():
node_id
=
int
(
node_res
.
id
)
parcel_arr
=
kw
[
'packages'
]
date
=
datetime
.
now
()
-
timedelta
(
days
=
int
(
push_interval_15_days
))
create_package_arr
=
[]
# 创建小包vals数据
delete_goods_ids
=
[]
# 删除的小包商品
package_no_id_res
=
{}
# 维护小包号和id的对应
create_package_no_arr
=
[]
# 创建的小包号列表
package_goods_vals_arr
=
[]
# 创建的小包商品vals数据
data_text
=
json
.
dumps
(
parcel_arr
)
utc_time
=
datetime
.
utcnow
()
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
)
for
package
in
parcel_arr
:
try
:
logistic_order_no
=
package
.
get
(
'provider_order_id'
)
sql
=
"select id,bl_id,is_cancel from cc_ship_package where logistic_order_no=
%
s and create_date >=
%
s"
parcel_sql_result
=
pd
.
read_sql
(
sql
,
con
=
db_handle
,
params
=
(
logistic_order_no
,
date
))
parcel_id
=
''
parcel_bl_id
=
''
for
parcel_res
in
parcel_sql_result
.
itertuples
():
parcel_id
=
parcel_res
.
id
parcel_bl_id
=
parcel_res
.
bl_id
parcel_is_cancel
=
parcel_res
.
is_cancel
if
(
parcel_id
and
(
not
parcel_bl_id
or
parcel_is_cancel
))
or
not
parcel_id
:
ship_package_vals
=
dict
(
create_date
=
utc_time
,
state
=
node_id
,
is_cancel
=
False
,
cancel_reason
=
''
,
logistic_order_no
=
package
.
get
(
'provider_order_id'
),
tracking_no
=
package
.
get
(
'tracking_no'
),
customer_ref
=
package
.
get
(
'declaretion_bill_id'
),
internal_account_number
=
""
,
user_track_note
=
package
.
get
(
'remark'
),
company_code
=
package
.
get
(
'entity_code'
),
trade_no
=
package
.
get
(
'order_no'
),
# 需要将时间搓转换为时间
operation_time
=
datetime
.
fromtimestamp
(
int
(
package
.
get
(
'operate_time'
))
/
1000
)
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
),
big_package_no
=
package
.
get
(
'big_bag_no'
),
container_no
=
package
.
get
(
'container_no'
),
buyer_region
=
package
.
get
(
'buyer_region'
),
next_provider_name
=
package
.
get
(
'next_provider_name'
),
sender_name
=
package
.
get
(
'sender_info'
)
.
get
(
'name'
),
sender_vat_no
=
package
.
get
(
'sender_info'
)
.
get
(
'shipping_tax_id'
),
sender_phone
=
package
.
get
(
'sender_info'
)
.
get
(
'phone'
),
sender_country
=
package
.
get
(
'sender_info'
)
.
get
(
'address'
)
.
get
(
'address_l0'
),
sender_state
=
package
.
get
(
'sender_info'
)
.
get
(
'address'
)
.
get
(
'address_l1'
),
sender_city
=
package
.
get
(
'sender_info'
)
.
get
(
'address'
)
.
get
(
'address_l2'
),
sender_add_1
=
package
.
get
(
'sender_info'
)
.
get
(
'address'
)
.
get
(
'address_l3'
),
sender_add_2
=
package
.
get
(
'sender_info'
)
.
get
(
'address'
)
.
get
(
'address_l4'
),
sender_add_3
=
package
.
get
(
'sender_info'
)
.
get
(
'address'
)
.
get
(
'details'
),
sender_postcode
=
package
.
get
(
'sender_info'
)
.
get
(
'postcode'
),
receiver_name
=
package
.
get
(
'receiver_info'
)
.
get
(
'name'
),
receiver_phone
=
package
.
get
(
'receiver_info'
)
.
get
(
'phone'
),
receiver_postcode
=
package
.
get
(
'receiver_info'
)
.
get
(
'postcode'
),
receiver_add_1
=
package
.
get
(
'receiver_info'
)
.
get
(
'address'
)
.
get
(
'address_l0'
),
receiver_add_2
=
package
.
get
(
'receiver_info'
)
.
get
(
'address'
)
.
get
(
'address_l1'
),
receiver_add_3
=
package
.
get
(
'receiver_info'
)
.
get
(
'address'
)
.
get
(
'address_l2'
),
receiver_city
=
package
.
get
(
'receiver_info'
)
.
get
(
'address'
)
.
get
(
'address_l3'
),
receiver_county
=
package
.
get
(
'receiver_info'
)
.
get
(
'address'
)
.
get
(
'address_l4'
),
receiver_detailed_address
=
package
.
get
(
'receiver_info'
)
.
get
(
'address'
)
.
get
(
'details'
),
receiver_vat_no
=
package
.
get
(
'receiver_info'
)
.
get
(
'tax_id'
),
currency
=
package
.
get
(
'currency'
),
gross_weight
=
package
.
get
(
'package'
)
.
get
(
'real_weight'
),
weight_unit
=
package
.
get
(
'package'
)
.
get
(
'weight_unit'
),
total_value
=
package
.
get
(
'value'
)
.
get
(
'goods_value'
),
customer_id
=
tt_customer_id
,
is_sync
=
True
)
# 增加客户信息
if
package
.
get
(
'items'
)
and
len
(
package
.
get
(
'items'
))
>
0
:
good_id_arr
=
[]
for
item
in
package
.
get
(
'items'
):
item_id
=
item
.
get
(
'item_id'
)
if
item_id
not
in
good_id_arr
:
good_id_arr
.
append
(
item_id
)
package_good
=
dict
(
create_date
=
utc_time
,
bl_line_id
=
logistic_order_no
,
bl_id
=
False
,
item_id
=
item_id
,
sku_id
=
item
.
get
(
'sku_id'
),
item_name_cn
=
item
.
get
(
'product_name_cn'
),
item_name_en
=
item
.
get
(
'product_name'
),
export_hs_code
=
item
.
get
(
'export_hscode'
),
import_hs_code
=
item
.
get
(
'import_hscode'
),
weight
=
item
.
get
(
'weight'
)
or
None
,
quantity
=
item
.
get
(
'qty'
),
quantity_unit
=
item
.
get
(
'unit'
),
declare_price
=
item
.
get
(
'unit_price'
)
or
None
,
freight
=
item
.
get
(
'shipping_fee'
)
or
None
,
cod_amount
=
item
.
get
(
'cod_fee'
)
or
None
,
vat_rate
=
item
.
get
(
'vat_rate'
)
or
None
,
item_vat
=
item
.
get
(
'vat_fee'
)
or
None
,
origin_country
=
item
.
get
(
'origin_country'
),
item_type
=
item
.
get
(
'item_type'
)
or
None
,
item_total_price
=
item
.
get
(
'unit_price'
)
or
None
,
item_link
=
item
.
get
(
'item_url'
),
item_tax_status
=
item
.
get
(
'tax_mark'
)
or
None
,
)
# _logger.info('package_good:%s' % package_good)
if
package_good
:
package_goods_vals_arr
.
append
(
package_good
)
if
not
parcel_id
:
# 创建小包
create_package_no_arr
.
append
(
logistic_order_no
)
create_package_arr
.
append
(
ship_package_vals
)
else
:
# 删除小包商品
package_no_id_res
[
logistic_order_no
]
=
(
parcel_id
,
parcel_bl_id
)
# 小包号对应id
sql
=
'select id from cc_package_good where bl_line_id =
%
s'
delete_good_result
=
pd
.
read_sql
(
sql
,
con
=
db_handle
,
params
=
(
parcel_id
,))
origin_goods_ids
=
delete_good_result
[
'id'
]
.
tolist
()
delete_goods_ids
+=
origin_goods_ids
# 更新小包
update_sql
=
"""
UPDATE cc_ship_package
SET
is_cancel =
%
s,
cancel_reason =
%
s,
logistic_order_no =
%
s,
tracking_no =
%
s,
customer_ref =
%
s,
internal_account_number =
%
s,
user_track_note =
%
s,
company_code =
%
s,
trade_no =
%
s,
operation_time =
%
s,
big_package_no =
%
s,
container_no =
%
s,
buyer_region =
%
s,
next_provider_name =
%
s,
sender_name =
%
s,
sender_vat_no =
%
s,
sender_phone =
%
s,
sender_country =
%
s,
sender_state =
%
s,
sender_city =
%
s,
sender_add_1 =
%
s,
sender_add_2 =
%
s,
sender_add_3 =
%
s,
sender_postcode =
%
s,
receiver_name =
%
s,
receiver_phone =
%
s,
receiver_postcode =
%
s,
receiver_add_1 =
%
s,
receiver_add_2 =
%
s,
receiver_add_3 =
%
s,
receiver_city =
%
s,
receiver_county =
%
s,
receiver_detailed_address =
%
s,
receiver_vat_no =
%
s,
currency =
%
s,
gross_weight =
%
s,
weight_unit =
%
s,
total_value =
%
s,
customer_id =
%
s
WHERE id =
%
s;
"""
# 构建参数列表
params
=
(
False
,
# is_cancel
''
,
# cancel_reason
ship_package_vals
[
'logistic_order_no'
],
ship_package_vals
[
'tracking_no'
],
ship_package_vals
[
'customer_ref'
],
''
,
# internal_account_number
ship_package_vals
[
'user_track_note'
],
ship_package_vals
[
'company_code'
],
ship_package_vals
[
'trade_no'
],
ship_package_vals
[
'operation_time'
],
ship_package_vals
[
'big_package_no'
],
ship_package_vals
[
'container_no'
],
ship_package_vals
[
'buyer_region'
],
ship_package_vals
[
'next_provider_name'
],
ship_package_vals
[
'sender_name'
],
ship_package_vals
[
'sender_vat_no'
],
ship_package_vals
[
'sender_phone'
],
ship_package_vals
[
'sender_country'
],
ship_package_vals
[
'sender_state'
],
ship_package_vals
[
'sender_city'
],
ship_package_vals
[
'sender_add_1'
],
ship_package_vals
[
'sender_add_2'
],
ship_package_vals
[
'sender_add_3'
],
ship_package_vals
[
'sender_postcode'
],
ship_package_vals
[
'receiver_name'
],
ship_package_vals
[
'receiver_phone'
],
ship_package_vals
[
'receiver_postcode'
],
ship_package_vals
[
'receiver_add_1'
],
ship_package_vals
[
'receiver_add_2'
],
ship_package_vals
[
'receiver_add_3'
],
ship_package_vals
[
'receiver_city'
],
ship_package_vals
[
'receiver_county'
],
ship_package_vals
[
'receiver_detailed_address'
],
ship_package_vals
[
'receiver_vat_no'
],
ship_package_vals
[
'currency'
],
ship_package_vals
[
'gross_weight'
],
ship_package_vals
[
'weight_unit'
],
ship_package_vals
[
'total_value'
],
tt_customer_id
,
parcel_id
# id
)
pd
.
read_sql
(
update_sql
,
con
=
db_handle
,
params
=
params
,
chunksize
=
100
)
except
Exception
as
err
:
return_res
[
'all_result'
]
=
False
return_res
[
'failed_provider_order_ids'
]
.
append
(
package
.
get
(
'provider_order_id'
))
return_res
[
'err_msg'
]
.
update
(
{
package
.
get
(
'provider_order_id'
):
str
(
err
)})
msg
=
str
(
err
)
_logger
.
error
(
'package_declare error:
%
s'
%
str
(
err
))
_logger
.
error
(
'小包异常:
%
s'
%
kw
)
# 创建小包
# print('create_package_arr', create_package_arr)
with
db_handle
.
begin
()
as
connect
:
if
create_package_arr
:
val_df
=
pd
.
DataFrame
(
create_package_arr
)
val_df
.
to_sql
(
'cc_ship_package'
,
con
=
connect
,
if_exists
=
'append'
,
index
=
False
)
# 获取创建后 小包单号对应的id,bl_id
package_no_str
=
'(
%
s)'
%
str
(
create_package_no_arr
)[
1
:
-
1
]
sql
=
'select id,logistic_order_no,bl_id from cc_ship_package where logistic_order_no in
%
s'
%
package_no_str
new_order_arr
=
pd
.
read_sql
(
sql
,
con
=
connect
)
for
new_order_data
in
new_order_arr
.
itertuples
():
package_no_id_res
[
new_order_data
.
logistic_order_no
]
=
(
new_order_data
.
id
,
new_order_data
.
bl_id
)
if
delete_goods_ids
:
# 删除小包商品
ids
=
'(
%
s)'
%
str
(
delete_goods_ids
)[
1
:
-
1
]
delete_goods_sql
=
'delete from cc_package_good where id in
%
s'
%
ids
pd
.
read_sql
(
delete_goods_sql
,
con
=
connect
,
chunksize
=
100
)
if
package_goods_vals_arr
:
for
package_goods_vals
in
package_goods_vals_arr
:
logistic_order_no
=
package_goods_vals
[
'bl_line_id'
]
data_list
=
package_no_id_res
.
get
(
logistic_order_no
,
[])
package_goods_vals
[
'bl_line_id'
]
=
data_list
[
0
]
if
data_list
else
None
package_goods_vals
[
'bl_id'
]
=
data_list
[
1
]
if
data_list
else
None
val_df
=
pd
.
DataFrame
(
package_goods_vals_arr
)
val_df
.
to_sql
(
'cc_package_good'
,
con
=
connect
,
if_exists
=
'append'
,
index
=
False
)
except
Exception
as
err
:
return_res
[
'all_result'
]
=
False
return_res
[
'failed_provider_order_ids'
]
.
append
(
logistic_order_no
)
return_res
[
'err_msg'
]
.
update
(
{
logistic_order_no
:
str
(
err
)})
_logger
.
error
(
'package_declare error:
%
s'
%
str
(
err
))
msg
=
str
(
err
)
_logger
.
error
(
'小包异常:
%
s'
%
kw
)
val
=
{
'big_bag_no'
:
logistic_order_no
,
'error_msg'
:
msg
,
'push_time'
:
utc_time
,
'data_text'
:
data_text
,
'success_bl'
:
return_res
[
'all_result'
],
'request_id'
:
kw
[
'request_id'
],
'source'
:
'推入'
,
'create_date'
:
datetime
.
utcnow
()
}
val_df
=
pd
.
DataFrame
(
val
,
index
=
[
0
])
val_df
.
to_sql
(
'ao_tt_api_log'
,
con
=
db_handle
,
if_exists
=
'append'
,
index
=
False
)
return
return_res
def
mawb_declare
(
self
,
**
kws
):
# 接收提单信息、大包与小包的关联信息
return_res
=
{
"err_msg"
:
''
,
}
master_waybill_no
=
''
data_text
=
''
utc_time
=
''
try
:
push_interval_15_days
=
15
tt_customer_id
=
None
# 查询过滤最近天数 查询默认客户
sql
=
"select key,value from ir_config_parameter where key='tk_push_interval_15_days' or key='tt_customer_id'"
result_arr
=
pd
.
read_sql
(
sql
,
con
=
db_handle
)
for
res
in
result_arr
.
itertuples
():
if
res
.
key
==
'tk_push_interval_15_days'
:
push_interval_15_days
=
int
(
res
.
value
)
elif
res
.
key
==
'tt_customer_id'
:
tt_customer_id
=
int
(
res
.
value
)
# 查询默认提单状态
node_id
=
None
sql
=
"select id from cc_node where node_type='bl' and is_default=True limit 1"
node_arr
=
pd
.
read_sql
(
sql
,
con
=
db_handle
)
for
node_res
in
node_arr
.
itertuples
():
node_id
=
int
(
node_res
.
id
)
# 获取提单号 提单信息
master_waybill_no
=
kws
[
'master_waybill_no'
]
mawb_info
=
kws
.
get
(
'mwb_info'
)
date
=
datetime
.
now
()
-
timedelta
(
days
=
int
(
push_interval_15_days
))
data_text
=
json
.
dumps
(
kws
)
utc_time
=
datetime
.
utcnow
()
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
)
declare_type
=
kws
.
get
(
'declare_type'
)
if
declare_type
:
declare_type
=
declare_type
.
lower
()
# 提单数据
bl_vals
=
dict
(
bl_no
=
master_waybill_no
,
customs_bl_no
=
kws
.
get
(
'customs_waybill_id'
),
trade_type
=
kws
.
get
(
'trade_type'
),
big_package_qty
=
kws
.
get
(
'big_bag_quantity'
),
big_package_sell_country
=
kws
.
get
(
'buyer_region'
),
declare_type
=
kws
.
get
(
'declare_type'
),
transport_tool_code
=
mawb_info
.
get
(
'transport_code'
),
transport_tool_name
=
mawb_info
.
get
(
'transport_name'
),
start_port_code
=
mawb_info
.
get
(
'depart_port_code'
),
end_port_code
=
mawb_info
.
get
(
'arrive_port_code'
),
billing_weight
=
mawb_info
.
get
(
'chargable_weight'
)
or
None
,
actual_weight
=
mawb_info
.
get
(
'real_weight'
)
or
None
,
etd
=
mawb_info
.
get
(
'etd'
),
eta
=
mawb_info
.
get
(
'eta'
),
customer_id
=
tt_customer_id
,
is_cancel
=
False
,
customs_clearance_status
=
node_id
,
state
=
'draft'
,
create_date
=
utc_time
,
is_bl_sync
=
True
,
bl_date
=
datetime
.
now
()
.
strftime
(
'
%
Y-
%
m-
%
d'
),
cc_deadline
=
(
datetime
.
now
()
+
timedelta
(
days
=
5
))
.
strftime
(
'
%
Y-
%
m-
%
d'
))
# big_package_qty
# 查询最近15天内取消的提单号
bl_id
=
''
bl_state
=
''
sql
=
"select id,state from cc_bl where bl_no=
%
s and is_cancel=False and create_date >=
%
s limit 1"
waybill_sql_result
=
pd
.
read_sql
(
sql
,
con
=
db_handle
,
params
=
(
master_waybill_no
,
date
))
for
waybill_sql_res
in
waybill_sql_result
.
itertuples
():
bl_id
=
waybill_sql_res
.
id
bl_state
=
waybill_sql_res
.
state
exit_bl_obj
=
[]
if
declare_type
==
'create'
:
# 检查 如果类型是create 根据提单号和大包数量查询到了数据 就不做处理
select_bl_sql
=
"""select id from cc_bl where bl_no='{0}' and big_package_qty={1};"""
.
format
(
kws
.
get
(
'master_waybill_no'
),
kws
.
get
(
'big_bag_quantity'
))
select_sql_result
=
pd
.
read_sql
(
select_bl_sql
,
con
=
db_handle
)
exit_bl_obj
=
select_sql_result
if
not
len
(
waybill_sql_result
):
if
declare_type
==
'create'
:
val_df
=
pd
.
DataFrame
(
bl_vals
,
index
=
[
0
])
val_df
.
to_sql
(
'cc_bl'
,
con
=
db_handle
,
if_exists
=
'append'
,
index
=
False
)
file_name_arr
=
[
'主单'
,
'货站提货POD'
,
'Manifest格式和数据(cvs/excel格式,系统目前不支持,线下提供或保留现有方式)'
,
'海关CDS申报单(import和授权方式检查拉齐等)'
,
'尾程交接POD(待大包数量和箱号)'
]
sql
=
"select id,state from cc_bl where bl_no=
%
s limit 1"
new_bl_result
=
pd
.
read_sql
(
sql
,
con
=
db_handle
,
params
=
(
master_waybill_no
,))
# new_bl_id = ''
for
new_bl_res
in
new_bl_result
.
itertuples
():
bl_id
=
new_bl_res
.
id
if
bl_id
:
file_vals_arr
=
[{
'file_name'
:
i
,
'bl_id'
:
bl_id
}
for
i
in
file_name_arr
]
val_df
=
pd
.
DataFrame
(
file_vals_arr
)
val_df
.
to_sql
(
'cc_clearance_file'
,
con
=
db_handle
,
if_exists
=
'append'
,
index
=
False
)
else
:
if
declare_type
==
'update'
and
bl_state
==
'draft'
:
update_sql
=
"""
UPDATE cc_bl
SET
customs_bl_no =
%
s,
trade_type =
%
s,
big_package_qty =
%
s,
big_package_sell_country =
%
s,
declare_type =
%
s,
transport_tool_code =
%
s,
transport_tool_name =
%
s,
start_port_code =
%
s,
end_port_code =
%
s,
billing_weight =
%
s,
actual_weight =
%
s,
etd =
%
s,
eta =
%
s,
customer_id =
%
s,
customs_clearance_status =
%
s
WHERE id =
%
s;
"""
params
=
(
kws
.
get
(
'customs_waybill_id'
),
kws
.
get
(
'trade_type'
),
kws
.
get
(
'big_bag_quantity'
),
kws
.
get
(
'buyer_region'
),
kws
.
get
(
'declare_type'
),
mawb_info
.
get
(
'transport_code'
),
mawb_info
.
get
(
'transport_name'
),
mawb_info
.
get
(
'depart_port_code'
),
mawb_info
.
get
(
'arrive_port_code'
),
mawb_info
.
get
(
'chargable_weight'
)
or
None
,
mawb_info
.
get
(
'real_weight'
)
or
None
,
mawb_info
.
get
(
'etd'
),
mawb_info
.
get
(
'eta'
),
tt_customer_id
,
node_id
,
bl_id
# WHERE条件的值
)
pd
.
read_sql
(
update_sql
,
con
=
db_handle
,
params
=
params
,
chunksize
=
100
)
if
bl_id
and
((
declare_type
==
'create'
and
len
(
exit_bl_obj
)
<=
0
)
or
(
declare_type
==
'update'
and
bl_state
==
'draft'
)):
big_bag_list
=
kws
.
get
(
'big_bag_list'
)
if
big_bag_list
and
len
(
big_bag_list
)
>
0
:
# 删除提单关联的所有大包 100多条大包数据
delete_big_sql
=
"delete from cc_big_package where bl_id=
%
s"
pd
.
read_sql
(
delete_big_sql
,
con
=
db_handle
,
params
=
(
bl_id
,),
chunksize
=
100
)
# 更新提单所有小包
update_package_sql
=
"update cc_ship_package set bl_id=null,big_package_id=null where bl_id=
%
s"
pd
.
read_sql
(
update_package_sql
,
con
=
db_handle
,
params
=
(
bl_id
,),
chunksize
=
100
)
# 批量该提单创建所有大包
big_bag_vals_arr
=
[
{
'bl_id'
:
bl_id
,
'big_package_no'
:
big_bag
.
get
(
'big_bag_no'
),
'next_provider_name'
:
big_bag
.
get
(
'next_provider_name'
),
'create_date'
:
utc_time
,
'tally_state'
:
'unprocessed_goods'
}
for
big_bag
in
big_bag_list
]
val_df
=
pd
.
DataFrame
(
big_bag_vals_arr
)
val_df
.
to_sql
(
'cc_big_package'
,
con
=
db_handle
,
if_exists
=
'append'
,
index
=
False
)
# 批量查询该提单所有大包信息
big_bag_no_list
=
[
big_bag
.
get
(
'big_bag_no'
)
for
big_bag
in
big_bag_list
]
big_bag_no_placeholder
=
', '
.
join
([
'
%
s'
]
*
len
(
big_bag_no_list
))
big_sql
=
f
"SELECT id, big_package_no FROM cc_big_package WHERE big_package_no IN ({big_bag_no_placeholder}) AND create_date >=
%
s"
big_package_results
=
pd
.
read_sql
(
big_sql
,
con
=
db_handle
,
params
=
big_bag_no_list
+
[
date
])
big_package_dict
=
{
row
[
'big_package_no'
]:
row
[
'id'
]
for
row
in
big_package_results
.
to_dict
(
orient
=
'records'
)}
for
big_bag
in
big_bag_list
:
big_bag_no
=
big_bag
.
get
(
'big_bag_no'
)
package_list
=
big_bag
.
get
(
'package_list'
,
[])
big_package_id
=
big_package_dict
.
get
(
big_bag_no
)
# 检查big_bag_no是否已经存在
# big_package_vals = dict(bl_id=bl_id,
# big_package_no=big_bag.get(
# 'big_bag_no'),
# next_provider_name=big_bag.get('next_provider_name'), create_date=utc_time)
# big_sql = "select id from cc_big_package where big_package_no=%s and create_date >= %s limit 1"
# big_package_id = ''
# big_sql_result = pd.read_sql(big_sql, con=db_handle, params=(big_bag_no, date))
# for big_sql_res in big_sql_result.itertuples():
# big_package_id = big_sql_res.id
# if not big_package_id:
# val_df = pd.DataFrame(big_package_vals, index=[0])
# val_df.to_sql('cc_big_package', con=db_handle, if_exists='append', index=False)
# sql = "select id from cc_big_package where big_package_no=%s and bl_id=%s and create_date >= %s limit 1"
# new_big_result = pd.read_sql(sql, con=db_handle, params=(big_bag.get(
# 'big_bag_no'), bl_id, date))
# # new_bl_id = ''
# for new_big_res in new_big_result.itertuples():
# big_package_id = new_big_res.id
# else:
# update_big_sql = "update cc_big_package set bl_id=%s,big_package_no=%s,next_provider_name=%s where id=%s"
# pd.read_sql(update_big_sql, con=db_handle, params=(bl_id, big_bag.get(
# 'big_bag_no'), big_bag.get(
# 'next_provider_name'), big_package_id), chunksize=100)
# 生成cc.ship.package
if
package_list
and
len
(
package_list
)
>
0
:
package_ids
=
[
package
.
get
(
'provider_order_id'
)
for
package
in
package_list
]
# 传来的大包下的小包列表
# print(tuple(package_ids))
package_ids_str
=
'(
%
s)'
%
str
(
package_ids
)[
1
:
-
1
]
package_sql
=
"select id,logistic_order_no from cc_ship_package where logistic_order_no in {} and create_date >=
%
s"
.
format
(
package_ids_str
)
# print(package_sql)
package_sql_result
=
pd
.
read_sql
(
package_sql
,
con
=
db_handle
,
params
=
(
date
,))
if
len
(
package_sql_result
)
>
0
:
package_id_arr
=
package_sql_result
[
'id'
]
.
tolist
()
ids_str
=
'(
%
s)'
%
str
(
package_id_arr
)[
1
:
-
1
]
update_ship_sql
=
"update cc_ship_package set is_cancel=False,cancel_reason='',big_package_id=
%
s,bl_id=
%
s where id in {0}"
.
format
(
ids_str
)
pd
.
read_sql
(
update_ship_sql
,
con
=
db_handle
,
params
=
(
big_package_id
,
bl_id
),
chunksize
=
100
)
# 更新小包商品关联id
update_ship_sql
=
"update cc_package_good set big_package_id=
%
s,bl_id=
%
s where bl_line_id in {0}"
.
format
(
ids_str
)
pd
.
read_sql
(
update_ship_sql
,
con
=
db_handle
,
params
=
(
big_package_id
,
bl_id
),
chunksize
=
100
)
if
len
(
package_ids
)
!=
len
(
package_sql_result
):
# 找出ship_packages没有找到的package
logistic_order_no_arr
=
package_sql_result
[
'logistic_order_no'
]
.
tolist
()
package_ids
=
set
(
package_ids
)
ship_package_ids
=
set
(
logistic_order_no_arr
)
diff_ids
=
package_ids
-
ship_package_ids
_logger
.
info
(
'diff_ids:
%
s'
%
diff_ids
)
else
:
return_res
[
'err_msg'
]
=
'Big bag [
%
s] not include any package. '
%
big_bag
.
get
(
'big_bag_no'
)
else
:
return_res
[
'err_msg'
]
=
'Big bag list is empty.'
if
not
return_res
[
'err_msg'
]:
# 根据商品分组 更新提单大包数量 小包数量 商品数量
update_bl_info_sql
=
"""
UPDATE cc_bl SET bl_total_big_qty = s.big_num, bl_total_qty=s.good_num, bl_ship_package_qty=s.ship_num from (
SELECT bl_id, COUNT(DISTINCT big_package_id) as big_num , COUNT(DISTINCT bl_line_id) as ship_num, COUNT(*) AS good_num
FROM cc_package_good
WHERE bl_id = {0}
GROUP BY bl_id
) s
where id = s.bl_id
;
"""
.
format
(
bl_id
)
pd
.
read_sql
(
update_bl_info_sql
,
con
=
db_handle
,
chunksize
=
100
)
# 更新提单下的大包的 小包数量 商品数量
sql
=
"select id from cc_big_package where bl_id=
%
s"
big_sql_result
=
pd
.
read_sql
(
sql
,
con
=
db_handle
,
params
=
(
bl_id
,))
big_sql_result_ids
=
big_sql_result
[
'id'
]
.
tolist
()
big_ids_str
=
'(
%
s)'
%
str
(
big_sql_result_ids
)[
1
:
-
1
]
update_big_info_sql
=
"""
UPDATE cc_big_package SET ship_package_qty = s.ship_num, goods_qty=s.good_num from (
SELECT big_package_id, COUNT(DISTINCT bl_line_id) as ship_num, COUNT(*) AS good_num
FROM cc_package_good
WHERE big_package_id IN {0}
GROUP BY big_package_id
) s
where id = s.big_package_id
;
"""
.
format
(
big_ids_str
)
pd
.
read_sql
(
update_big_info_sql
,
con
=
db_handle
,
chunksize
=
100
)
# 更新提单总金额
update_bl_amount_sql
=
"""
UPDATE cc_bl
SET bl_total_amount = (
SELECT SUM(total_value)
FROM cc_ship_package
WHERE bl_id =
%
s
)
WHERE id =
%
s;
"""
pd
.
read_sql
(
update_bl_amount_sql
,
con
=
db_handle
,
params
=
(
bl_id
,
bl_id
,),
chunksize
=
100
)
# 更新提单的尾程服务商 - 简化高效版本
try
:
_logger
.
info
(
f
"开始更新提单 {bl_id} 的尾程服务商"
)
# 先清空现有关联
clear_sql
=
"DELETE FROM cc_bill_loading_last_mile_provider_rel WHERE bl_id =
%
s"
pd
.
read_sql
(
clear_sql
,
con
=
db_handle
,
params
=
(
bl_id
,),
chunksize
=
100
)
# 获取提单下所有大包的下一个快递名称
get_names_sql
=
"""
SELECT DISTINCT LOWER(TRIM(bp.next_provider_name)) as provider_name
FROM cc_big_package bp
WHERE bp.bl_id =
%
s
AND bp.next_provider_name IS NOT NULL
AND bp.next_provider_name != ''
"""
names_result
=
pd
.
read_sql
(
get_names_sql
,
con
=
db_handle
,
params
=
(
bl_id
,))
_logger
.
info
(
f
"提单 {bl_id} 找到 {len(names_result)} 个快递名称"
)
if
not
names_result
.
empty
:
# 获取所有尾程快递的匹配值
get_providers_sql
=
"""
SELECT id, matching_value
FROM cc_last_mile_provider
WHERE matching_value IS NOT NULL AND matching_value != ''
"""
providers_result
=
pd
.
read_sql
(
get_providers_sql
,
con
=
db_handle
)
_logger
.
info
(
f
"找到 {len(providers_result)} 个尾程快递配置"
)
# 构建匹配映射
provider_mapping
=
{}
for
_
,
provider_row
in
providers_result
.
iterrows
():
provider_id
=
provider_row
[
'id'
]
matching_values
=
[
value
.
lower
()
.
strip
()
for
value
in
provider_row
[
'matching_value'
]
.
split
(
'
\n
'
)
if
value
.
strip
()]
for
value
in
matching_values
:
if
value
not
in
provider_mapping
:
provider_mapping
[
value
]
=
[]
provider_mapping
[
value
]
.
append
(
provider_id
)
# 查找匹配的provider
matched_provider_ids
=
set
()
for
_
,
name_row
in
names_result
.
iterrows
():
provider_name
=
name_row
[
'provider_name'
]
if
provider_name
in
provider_mapping
:
matched_provider_ids
.
update
(
provider_mapping
[
provider_name
])
_logger
.
info
(
f
"匹配到快递名称: {provider_name} -> {provider_mapping[provider_name]}"
)
# 批量插入匹配结果
if
matched_provider_ids
:
insert_data
=
[(
bl_id
,
provider_id
)
for
provider_id
in
matched_provider_ids
]
insert_df
=
pd
.
DataFrame
(
insert_data
,
columns
=
[
'bl_id'
,
'last_mile_provider_id'
])
# 使用事务确保数据一致性
with
db_handle
.
begin
()
as
conn
:
insert_df
.
to_sql
(
'cc_bill_loading_last_mile_provider_rel'
,
con
=
conn
,
if_exists
=
'append'
,
index
=
False
,
method
=
'multi'
)
_logger
.
info
(
f
"成功插入 {len(matched_provider_ids)} 个尾程服务商关联"
)
else
:
_logger
.
warning
(
f
"提单 {bl_id} 没有匹配到任何尾程服务商"
)
else
:
_logger
.
warning
(
f
"提单 {bl_id} 没有找到任何快递名称"
)
except
Exception
as
e
:
_logger
.
error
(
f
"更新尾程服务商失败: {str(e)}"
)
_logger
.
error
(
f
"提单ID: {bl_id}"
)
# 不抛出异常,继续执行其他逻辑
except
Exception
as
err
:
return_res
[
'err_msg'
]
=
str
(
err
)
_logger
.
error
(
'mawb_declare error:
%
s'
%
str
(
err
))
_logger
.
error
(
'提单异常:
%
s'
%
kws
)
val
=
{
'big_bag_no'
:
'提单下发:
%
s'
%
master_waybill_no
,
'error_msg'
:
return_res
[
'err_msg'
],
'push_time'
:
utc_time
,
'data_text'
:
data_text
,
'success_bl'
:
False
if
return_res
[
'err_msg'
]
else
True
,
'request_id'
:
kws
[
'request_id'
],
'source'
:
'推入'
,
'create_date'
:
datetime
.
utcnow
()
}
val_df
=
pd
.
DataFrame
(
val
,
index
=
[
0
])
val_df
.
to_sql
(
'ao_tt_api_log'
,
con
=
db_handle
,
if_exists
=
'append'
,
index
=
False
)
# return_res
return
return_res
try
:
pool
=
redis
.
ConnectionPool
(
**
config
.
redis_options
)
r
=
redis
.
Redis
(
connection_pool
=
pool
)
logging
.
info
(
u'redis连接成功'
)
Order_dispose
=
Order_dispose
()
while
1
:
try
:
result
=
r
.
brpop
([
'tiktok_parcel_data'
],
0
)
# print(result[1])
data1
=
result
[
1
]
task_data
=
json
.
loads
(
data1
)
# response_data = Order_dispose.tiktok_package_declare(data1)
# 根据类型进行分发是 处理小包还是提单tiktok_package_declare
data_type
=
task_data
.
get
(
'type'
)
handle_data
=
task_data
[
'result'
]
if
data_type
==
'package'
:
executor
.
submit
(
retry_call_with_args
,
Order_dispose
.
tiktok_package_declare
,
3
,
1
,
**
handle_data
)
elif
data_type
==
'mawb'
:
executor
.
submit
(
retry_call_with_args
,
Order_dispose
.
mawb_declare
,
3
,
1
,
**
handle_data
)
else
:
logging
.
error
(
'未找到数据类型'
)
except
Exception
as
e
:
logging
.
error
(
'error:
%
s'
%
str
(
e
))
continue
except
Exception
as
e
:
logging
.
error
(
"登录失败:
%
s"
%
e
)
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论