Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
X
xqh_temu_api
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
lqy
xqh_temu_api
Commits
3cc414d5
提交
3cc414d5
authored
3月 06, 2026
作者:
刘擎阳
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
1.新增轨迹接口
上级
55fb4906
隐藏空白字符变更
内嵌
并排
正在显示
2 个修改的文件
包含
454 行增加
和
1 行删除
+454
-1
temu_api.py
app/api/temu_api.py
+101
-0
temu_service.py
services/temu_service.py
+353
-1
没有找到文件。
app/api/temu_api.py
浏览文件 @
3cc414d5
...
...
@@ -237,3 +237,104 @@ def temu_cancel_order():
res
[
'msg'
]
=
'参数未传'
return
jsonify
(
res
)
@api.route
(
'/gettrack'
,
methods
=
[
'post'
])
# @check_sign
def
temu_get_track
():
"""获取轨迹"""
res
=
{
"success"
:
True
,
"errorCode"
:
0
,
"errorMsg"
:
"success"
,
"requestID"
:
"202312251715021060522200417739B9"
,
"serverTimeMs"
:
int
(
time
.
time
()
*
1000
),
"result"
:
{
"trackList"
:
[]
}
}
request_time
=
datetime
.
utcnow
()
timestamp
=
int
(
time
.
time
())
# res['ts'] = request_time.strftime("%Y-%m-%d %H:%M:%S")
res
[
'requestID'
]
=
request_time
.
strftime
(
"
%
Y
%
m
%
d
%
H
%
M
%
S"
)
+
str
(
timestamp
)
request_data
=
request
.
get_json
()
_logger
.
info
(
'temu_get_track:
%
s'
%
request_data
)
result
=
{}
if
request_data
:
# print(type(request_data))
data
=
request_data
result
[
'request_id'
]
=
res
[
'requestID'
]
result
[
'data'
]
=
data
logistics_carton_no
=
data
[
'logisticsCartonNo'
]
if
data
.
get
(
'logisticsCartonNo'
)
else
''
logistics_order_no
=
data
[
'logisticsOrderNo'
]
if
data
.
get
(
'logisticsOrderNo'
)
else
''
if
not
logistics_carton_no
and
not
logistics_order_no
:
res
[
'success'
]
=
False
res
[
'errorCode'
]
=
1007
res
[
'errorMsg'
]
=
'参数必填'
if
res
[
'errorCode'
]
==
0
:
# logging.info('推入redis')
# push_data = {'type': 'package', 'result': result}
# r_conn.lpush('tiktok_parcel_data', json.dumps(push_data))
# tiktok_package_declare.delay(**result)
return_res
=
rpc
.
temu_service
.
temu_get_track_service
(
**
result
)
if
return_res
:
if
return_res
[
'errorMsg'
]:
res
[
'success'
]
=
False
res
[
'errorCode'
]
=
1008
res
[
'errorMsg'
]
=
return_res
[
'errorMsg'
]
else
:
res
[
'result'
]
=
return_res
[
'result'
]
else
:
res
[
'errorCode'
]
=
5000
res
[
'errorMsg'
]
=
'参数未传'
return
jsonify
(
res
)
@api.route
(
'/getpod'
,
methods
=
[
'post'
])
# @check_sign
def
temu_get_pod
():
"""获取POD"""
res
=
{
"success"
:
True
,
"errorCode"
:
0
,
"errorMsg"
:
"success"
,
"requestID"
:
"202312251715021060522200417739B9"
,
"serverTimeMs"
:
int
(
time
.
time
()
*
1000
),
"result"
:
None
,
}
request_time
=
datetime
.
utcnow
()
timestamp
=
int
(
time
.
time
())
# res['ts'] = request_time.strftime("%Y-%m-%d %H:%M:%S")
res
[
'requestID'
]
=
request_time
.
strftime
(
"
%
Y
%
m
%
d
%
H
%
M
%
S"
)
+
str
(
timestamp
)
request_data
=
request
.
get_json
()
_logger
.
info
(
'temu_get_pod:
%
s'
%
request_data
)
result
=
{}
if
request_data
:
# print(type(request_data))
data
=
request_data
result
[
'request_id'
]
=
res
[
'requestID'
]
result
[
'data'
]
=
data
logistics_carton_no
=
data
[
'logisticsCartonNo'
]
if
data
.
get
(
'logisticsCartonNo'
)
else
''
logistics_order_no
=
data
[
'logisticsOrderNo'
]
if
data
.
get
(
'logisticsOrderNo'
)
else
''
if
not
logistics_carton_no
and
not
logistics_order_no
:
res
[
'errorCode'
]
=
1007
res
[
'errorMsg'
]
=
'参数必填'
if
res
[
'errorCode'
]
==
0
:
# logging.info('推入redis')
# push_data = {'type': 'package', 'result': result}
# r_conn.lpush('tiktok_parcel_data', json.dumps(push_data))
# tiktok_package_declare.delay(**result)
return_res
=
rpc
.
temu_service
.
temu_get_pod_service
(
**
result
)
if
return_res
:
if
return_res
[
'errorMsg'
]:
res
[
'success'
]
=
False
res
[
'errorCode'
]
=
1008
res
[
'errorMsg'
]
=
return_res
[
'errorMsg'
]
else
:
res
[
'result'
]
=
return_res
[
'result'
]
else
:
res
[
'errorCode'
]
=
5000
res
[
'errorMsg'
]
=
'参数未传'
return
jsonify
(
res
)
services/temu_service.py
浏览文件 @
3cc414d5
...
...
@@ -97,7 +97,7 @@ class TemuService(object):
elif
incoming_seq
==
(
current_seq
or
0
)
and
state
!=
'cancel'
:
# 如果需要返回箱号,查一下
carton_res
=
[]
if
str
(
d_mode
)
!=
'
1
'
or
str
(
d_method
)
!=
'3'
:
if
str
(
d_mode
)
!=
'
4
'
or
str
(
d_method
)
!=
'3'
:
cr
.
execute
(
"SELECT carton_no, service_carton_no FROM temu_order_carton WHERE order_id=
%
s"
,
(
order_id
,))
carton_res
=
[{
'cartonNo'
:
row
[
0
],
'logisticsCartonNo'
:
row
[
1
]}
for
row
in
cr
.
fetchall
()]
return_res
[
'result'
]
=
{
...
...
@@ -527,3 +527,355 @@ class TemuService(object):
_logger
.
error
(
f
"写入错误日志失败: {str(log_e)}"
)
return
return_res
@rpc
def
temu_get_track_service
(
self
,
**
kws
):
# 1. 初始化标准返回结构
return_res
=
{
"code"
:
200
,
# 假设 200 为成功
"msg"
:
''
,
"result"
:
{
"trackList"
:
[]
# 默认返回空数组
}
}
try
:
# 2. 安全获取参数,避免 KeyError
# 修复了原代码中 kw = {} 然后 kw['logisticsOrderNo'] 导致报错的问题
data
=
kws
.
get
(
'data'
,
{})
logistics_carton_no
=
data
.
get
(
'logisticsCartonNo'
)
logistics_order_no
=
data
.
get
(
'logisticsOrderNo'
)
# 使用上下文管理器:自动开启事务、提交或回滚
with
db_handle
.
get_cursor
()
as
cr
:
order_id
=
None
# 3. 根据入参查询订单 ID
if
logistics_carton_no
:
sql
=
"SELECT order_id FROM temu_order_carton WHERE service_carton_no=
%
s LIMIT 1"
cr
.
execute
(
sql
,
(
logistics_carton_no
,))
row
=
cr
.
fetchone
()
if
row
:
order_id
=
row
[
0
]
elif
logistics_order_no
:
sql
=
"SELECT id FROM temu_order WHERE logistics_order_no=
%
s LIMIT 1"
cr
.
execute
(
sql
,
(
logistics_order_no
,))
row
=
cr
.
fetchone
()
if
row
:
order_id
=
row
[
0
]
# 如果没查到订单,直接返回空 trackList
if
not
order_id
:
return
return_res
# 4. 拼接并执行轨迹查询 SQL
# 注意:需关联 res_country 获取国家简码,res_country_state 获取州简码,temu_node_config 获取枚举状态
# (关联外键字段名如果是 waybill_id 请把 order_id = %s 改为 waybill_id = %s)
track_sql
=
"""
SELECT
tnc.code AS status, -- 平台轨迹状态码
tnc.code AS operateStatus, -- 服务商操作状态码
tnc.status_translate AS operateContent, -- 轨迹描述 (英文)
tnr.op_time, -- 操作时间
tnr.op_timezone, -- 时区
tnr.op_location, -- 操作地点
rc.code AS country_code, -- 国家二字码 (ISO-3166-1)
rcs.code AS state_code, -- 州/省二字码 (ISO-3166-2)
tnr.city, -- 城市
tnr.zip_code, -- 邮编
tnr.id -- 记录的主键ID
FROM temu_node_record tnr
LEFT JOIN xqh_temu_node tnc ON tnr.node_id = tnc.id
LEFT JOIN res_country rc ON tnr.country_id = rc.id
LEFT JOIN res_country_state rcs ON tnr.state_id = rcs.id
WHERE tnr.waybill_id =
%
s
AND tnr.status = 'valid'
ORDER BY tnr.op_utc_time DESC
"""
cr
.
execute
(
track_sql
,
(
order_id
,))
records
=
cr
.
fetchall
()
# 5. 组装 Temu 要求的 JSON 格式
track_list
=
[]
pushed_record_ids
=
[]
# 【新增】用来收集成功返回的记录ID
for
rec
in
records
:
# 格式化日期:转为 "yyyy-MM-dd HH:mm:ss"
op_time
=
rec
[
3
]
operate_time_str
=
(
op_time
+
timedelta
(
hours
=
8
))
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
if
op_time
else
""
track
=
{
"status"
:
rec
[
0
]
or
""
,
# 平台轨迹状态码
"operateStatus"
:
rec
[
1
]
or
""
,
# 服务商轨迹操作状态码
"operateContent"
:
rec
[
2
]
or
""
,
# 轨迹描述
"operateTime"
:
operate_time_str
,
# 操作时间
"timeZone"
:
self
.
format_timezone
(
rec
[
4
])
or
""
,
# 时区 (如 GMT+08:00)
"operateAddress"
:
rec
[
5
]
or
""
,
# 操作地点
"country"
:
rec
[
6
]
or
""
,
# 国家简码 (如 CN, US)
"state"
:
rec
[
7
]
or
""
,
# 州/省简码
"city"
:
(
rec
[
8
]
or
""
)
.
upper
(),
# 城市,Temu要求全大写
"zip"
:
rec
[
9
]
or
""
# 邮编
}
track_list
.
append
(
track
)
# 【新增】收集第11个字段 (索引为10) 即 tnr.id
pushed_record_ids
.
append
(
rec
[
10
])
return_res
[
'result'
][
'trackList'
]
=
track_list
# 6. 【核心修改】将成功返回的节点记录的 is_pushed 更新为 True
if
pushed_record_ids
:
# 把 ID 列表转为元组传给 SQL。使用 IN 语句批量更新,性能最好
update_sql
=
"UPDATE temu_node_record SET is_pushed = True WHERE id IN
%
s"
cr
.
execute
(
update_sql
,
(
tuple
(
pushed_record_ids
),))
except
Exception
as
err
:
return_res
[
'code'
]
=
500
return_res
[
'msg'
]
=
str
(
err
)
_logger
.
error
(
'temu_get_track_service error:
%
s
\n
%
s'
,
str
(
err
))
return
return_res
# @rpc
# def temu_get_pod_service(self, **kws):
# # 1. 严格按照 Temu 要求的结构初始化返回值
# return_res = {
# "success": True,
# "errorCode": "0",
# "errorMsg": "success",
# "result": {
# "podList": []
# }
# }
# try:
# data = kws.get('data', {})
# logistics_carton_no = data.get('logisticsCartonNo')
# logistics_order_no = data.get('logisticsOrderNo')
# with db_handle.get_cursor() as cr:
# # 2. 根据入参查询关联的附件数据
# # 注意:这里需要根据您的实际 Many2many 关系表名进行修改。
# # 假设您运单表为 temu_order,M2M 关联表为 temu_order_pod_rel
# attachments = []
# if logistics_carton_no:
# # 查大箱维度的 POD
# # 假设大箱表: temu_order_carton, M2M表: temu_carton_pod_rel
# sql = """
# SELECT a.mimetype, a.datas, a.name
# FROM ir_attachment a
# JOIN carton_pod_rel rel ON a.id = rel.attachment_id
# JOIN temu_order_carton c ON c.id = rel.carton_id
# WHERE c.service_carton_no = %s
# LIMIT 10
# """
# cr.execute(sql, (logistics_carton_no,))
# attachments = cr.fetchall()
# elif logistics_order_no:
# # 查运单(订单)维度的 POD
# sql = """
# SELECT a.mimetype, a.datas, a.name
# FROM ir_attachment a
# JOIN waybill_pod_rel rel ON a.id = rel.attachment_id
# JOIN temu_order o ON o.id = rel.waybill_id
# WHERE o.logistics_order_no = %s
# LIMIT 10
# """
# cr.execute(sql, (logistics_order_no,))
# attachments = cr.fetchall()
# # 3. 组装 podList
# pod_list = []
# for att in attachments:
# mimetype = att[0] or ''
# # Odoo 的 datas 字段存的就是 base64 字符串/bytes
# b64_data = att[1] or ''
# file_name = (att[2] or '').lower()
# # 转换为标准字符串格式
# if isinstance(b64_data, bytes):
# b64_data = b64_data.decode('utf-8')
# # 4. 解析文件类型 (严格映射到 png, jpg, jpeg, pdf)
# pod_type = 'jpeg' # 默认给 jpeg 兜底
# if 'pdf' in mimetype or file_name.endswith('.pdf'):
# pod_type = 'pdf'
# elif 'png' in mimetype or file_name.endswith('.png'):
# pod_type = 'png'
# elif 'jpeg' in mimetype or 'jpg' in mimetype or file_name.endswith('.jpg') or file_name.endswith(
# '.jpeg'):
# pod_type = 'jpeg'
# pod_list.append({
# "podType": pod_type,
# "podUrl": "", # 优先使用 Base64,URL 置空即可
# "podBase64": b64_data
# })
# return_res['result']['podList'] = pod_list
# except Exception as err:
# # 发生异常时,按要求修改外层状态
# return_res['success'] = False
# return_res['errorCode'] = "500"
# return_res['errorMsg'] = str(err)
# # result 必须去除或者为空对象,通常报错时可以直接删掉 result 键或保留空列表
# return_res.pop('result', None)
#
# _logger.error('temu_get_pod_service error: %s\n%s', str(err))
#
# return return_res
@rpc
def
temu_get_pod_service
(
self
,
**
kws
):
# 1. 初始化标准返回格式
return_res
=
{
"success"
:
True
,
"errorCode"
:
"0"
,
"errorMsg"
:
""
,
"result"
:
{
"podList"
:
[]
}
}
try
:
data
=
kws
.
get
(
'data'
,
{})
logistics_carton_no
=
data
.
get
(
'logisticsCartonNo'
)
logistics_order_no
=
data
.
get
(
'logisticsOrderNo'
)
with
db_handle
.
get_cursor
()
as
cr
:
# 这里的 pod_url 请替换为你实际在表里加的那个存储链接的字段名!
urls_str
=
""
if
logistics_carton_no
:
# 查大箱表上的 URL 字段
sql
=
"""
SELECT pod_urls_text
FROM temu_order_carton
WHERE service_carton_no =
%
s
LIMIT 1
"""
cr
.
execute
(
sql
,
(
logistics_carton_no
,))
row
=
cr
.
fetchone
()
if
row
:
urls_str
=
row
[
0
]
or
""
elif
logistics_order_no
:
# 查运单(订单)表上的 URL 字段
sql
=
"""
SELECT pod_urls_text
FROM temu_order
WHERE logistics_order_no =
%
s
LIMIT 1
"""
cr
.
execute
(
sql
,
(
logistics_order_no
,))
row
=
cr
.
fetchone
()
if
row
:
urls_str
=
row
[
0
]
or
""
# 2. 如果查到了链接,组装 podList
if
urls_str
:
# 考虑到可能上传了多个文件存成逗号分隔,这里做个 split
url_list
=
urls_str
.
split
(
','
)
pod_list
=
[]
for
url
in
url_list
:
url
=
url
.
strip
()
if
not
url
:
continue
# 简单通过 URL 的后缀来判断类型 (默认给 jpeg 兜底)
url_lower
=
url
.
lower
()
pod_type
=
'jpeg'
if
'pdf'
in
url_lower
:
pod_type
=
'pdf'
elif
'png'
in
url_lower
:
pod_type
=
'png'
elif
'jpg'
in
url_lower
or
'jpeg'
in
url_lower
:
pod_type
=
'jpeg'
clean_url
=
url
if
'?access_token='
in
url
:
# 拆分出前半段和 token 段
parts
=
url
.
split
(
'?access_token='
)
base_link
=
parts
[
0
]
token_part
=
parts
[
1
]
# token_part 可能是 "xxxx-xxxx-xxxx/application/pdf"
# 用 '/' 分割并只取第 0 个元素,就拿到了纯净的 token
real_token
=
token_part
.
split
(
'/'
)[
0
]
# 重新拼装成 Odoo 标准的免密下载链接
clean_url
=
f
"{base_link}?access_token={real_token}"
else
:
# 兜底逻辑:如果 URL 里没有 access_token,直接暴破替换常见的结尾
for
ext
in
[
'/application/pdf'
,
'/image/png'
,
'/image/jpeg'
,
'/pdf'
,
'/png'
,
'/jpeg'
,
'/jpg'
]:
if
clean_url
.
lower
()
.
endswith
(
ext
):
clean_url
=
clean_url
[:
-
len
(
ext
)]
# 截断后缀
break
pod_list
.
append
({
"podType"
:
pod_type
,
"podUrl"
:
clean_url
,
# 直接塞入你存好的完整 URL
"podBase64"
:
None
# Base64 留空
})
# 【注意】Temu 限制最多返回 10 个
return_res
[
'result'
][
'podList'
]
=
pod_list
[:
10
]
# 3. 【新增】:如果成功返回了凭证,利用 SQL 将状态更新为 True
# ============================================================
if
pod_list
:
# 确保确实有有效的文件被返回
if
logistics_carton_no
:
# 查的是大箱,只更新该大箱的同步标记
update_carton_sql
=
"""
UPDATE temu_order_carton
SET is_sync_temu_pod = True
WHERE service_carton_no =
%
s
"""
cr
.
execute
(
update_carton_sql
,
(
logistics_carton_no
,))
elif
logistics_order_no
:
# 查的是运单,更新主运单的同步标记
update_order_sql
=
"""
UPDATE temu_order
SET is_sync_temu_pod = True
WHERE logistics_order_no =
%
s
"""
cr
.
execute
(
update_order_sql
,
(
logistics_order_no
,))
# 💡 额外保险逻辑:如果 Temu 是按订单拉取成功的,通常意味着整单已同步,
# 可以顺便把该订单下属的**所有大箱**的复选框也勾上。
# update_all_cartons_sql = """
# UPDATE temu_order_carton
# SET is_sync_temu_pod = True
# WHERE order_id = (
# SELECT id FROM temu_order WHERE logistics_order_no = %s LIMIT 1
# )
# """
# cr.execute(update_all_cartons_sql, (logistics_order_no,))
# ============================================================
except
Exception
as
err
:
return_res
[
'success'
]
=
False
return_res
[
'errorCode'
]
=
"500"
return_res
[
'errorMsg'
]
=
str
(
err
)
return_res
.
pop
(
'result'
,
None
)
_logger
.
error
(
'temu_get_pod_service error:
%
s
\n
%
s'
,
str
(
err
))
return
return_res
def
format_timezone
(
self
,
tz_str
):
"""
将各种不规范的时区字符串格式化为 Temu 标准格式: GMT±HH:mm
测试用例:
"+8" -> "GMT+08:00"
"-1" -> "GMT-01:00"
"0" -> "GMT+00:00"
"GMT+8:30" -> "GMT+08:30"
"""
if
not
tz_str
:
return
""
# 1. 统一转字符串并去除首尾空格、大写化
tz_str
=
str
(
tz_str
)
.
strip
()
.
upper
()
# 2. 移除可能包含的 GMT 或 UTC 前缀
tz_str
=
tz_str
.
replace
(
"GMT"
,
""
)
.
replace
(
"UTC"
,
""
)
.
strip
()
if
not
tz_str
:
return
"GMT+00:00"
# 3. 提取符号 (正负号)
sign
=
"+"
if
tz_str
.
startswith
(
"-"
):
sign
=
"-"
tz_str
=
tz_str
[
1
:]
elif
tz_str
.
startswith
(
"+"
):
sign
=
"+"
tz_str
=
tz_str
[
1
:]
# 4. 提取小时和分钟
try
:
if
":"
in
tz_str
:
# 处理 "8:00" 这种格式
parts
=
tz_str
.
split
(
":"
)
hours
=
int
(
parts
[
0
])
if
parts
[
0
]
else
0
mins
=
int
(
parts
[
1
])
if
len
(
parts
)
>
1
and
parts
[
1
]
else
0
elif
"."
in
tz_str
:
# 兼容极少见的浮点数输入,比如印度时区 5.5 -> 5小时30分
val
=
float
(
tz_str
)
hours
=
int
(
val
)
mins
=
int
(
round
((
val
-
hours
)
*
60
))
else
:
# 处理纯数字,比如 "8", "0", "1"
hours
=
int
(
tz_str
)
mins
=
0
# 5. 格式化为两位数拼接
return
f
"GMT{sign}{hours:02d}:{mins:02d}"
except
Exception
:
# 如果解析失败,原样返回或返回默认空值,避免阻断主流程
return
""
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论