提交 e2472b49 authored 作者: 刘擎阳's avatar 刘擎阳

1.xqh_temu服务

上级 470d7909
.idea/
*.pyc
FROM coding-public-docker.pkg.coding.net/public/docker/python:3.8
COPY . /app
WORKDIR /app
RUN /usr/local/bin/python3 -m pip install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple
RUN ls
RUN pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
RUN pip install flask_nameko==1.4.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
ENTRYPOINT ["python3", "manage.py"]
# -*- coding: utf-8 -*-
from logging.handlers import RotatingFileHandler
from flask import Flask
import logging
from flask_nameko import FlaskPooledClusterRpcProxy
from config import NAMEKO_CONFIG
rpc = FlaskPooledClusterRpcProxy()
def create_app():
app = Flask(__name__)
app.config.update(dict(
NAMEKO_AMQP_URI=NAMEKO_CONFIG['AMQP_URI'],
NAMEKO_MAX_CONNECTIONS=20,
))
rpc.init_app(app)
# 蓝图注册
from api import api as api_blueprint
app.register_blueprint(api_blueprint)
# 设置日志的记录等级
logging.basicConfig(level=logging.DEBUG) # 调试debug级
# 创建日志记录器,指明日志保存的路径、每个日志文件的最大大小、保存的日志文件个数上限
file_log_handler = RotatingFileHandler("logs/log.log", maxBytes=1024 * 1024 * 100, backupCount=10, encoding='utf-8')
# 创建日志记录的格式 日志等级 输入日志信息的文件名 行数 日志信息
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]')
# 为刚创建的日志记录器设置日志记录格式
file_log_handler.setFormatter(formatter)
# 为全局的日志工具对象(flask app使用的)添加日志记录器
logging.getLogger().addHandler(file_log_handler)
return app
import requests
import json
def test_create_order():
# 1. 接口地址
# 注意:根据你的描述,路由是 /temu/create/order,基础地址是 http://127.0.0.1:5000
url = "http://127.0.0.1:5000/temu/create/order"
# 2. 请求头
headers = {
"Content-Type": "application/json"
}
# 3. 请求数据 (你提供的数据)
payload = {
"sign": "2DE2BA70E44A1F64855FE5B4D404EC13",
"orderNo": "2343155111",
"channelCode": "channelCode",
"warehouse": "warehouse",
"sequence": 9,
"deliverMethod": 1,
"deliveryMode": 1,
"electrified": 0,
"exportLicenseSource": 1,
"companyInfo": {
"companyName": "gsmccs",
"phone": "111",
"email": "111@163.com"
},
"shippingInfo": {
"fullName": "发仓联系人",
"regionName1": "CN",
"regionName2": "上海市",
"regionName3": "上海市",
"regionName4": "长宁区",
"detailedAddress": "上海长宁大厦",
"email": "111@163.com",
"phone": "111"
},
"destinationInfo": {
"fullName": "海外仓联系人",
"regionName1": "US",
"regionName2": "California",
"regionName3": "Acton",
"regionName4": "",
"detailedAddress": "test1",
"postcode": "111",
"email": "111@163.com",
"phone": "111",
"warehouseNo": "12689079385"
},
"cartonInfo": [
{
"cartonNo": "2125",
"weight": 2,
"weightUnit": "KG",
"length": 100,
"width": 100,
"height": 100,
"lengthUnit": "CM",
"electrified": 0,
"skuInfo": [
{
"id": "111",
"quantity": 1
}
]
}
],
"containerInfo": [
{
"containerType": "abc",
"containerCnt": 1,
"containerWeight": 123.4,
"containerWeightUnit": "kg",
"containerVolume": 432.1,
"containerVolumeUnit": "CBM"
}
],
"interface_type": "bg.intligst.semihead.create",
"scene_id": 7017820,
"source_app_key": "f219476226359894f8f3a2b70568e1cf",
"target_app_key": "demo-e1a4a8688f0678ef3de8e2b4877"
}
try:
# 4. 发送 POST 请求
print(f"正在发送 POST 请求到: {url}")
response = requests.post(url, headers=headers, json=payload)
# 5. 打印响应状态码和内容
print(f"响应状态码: {response.status_code}")
# 尝试解析 JSON 响应
try:
resp_json = response.json()
print("响应内容 (JSON):")
print(json.dumps(resp_json, indent=4, ensure_ascii=False))
# 简单的结果判断
if resp_json.get("success"):
print(">>> 测试通过:下单成功")
else:
print(f">>> 测试失败:{resp_json.get('errorMsg')}")
except json.JSONDecodeError:
print("响应内容 (非 JSON):")
print(response.text)
except requests.exceptions.ConnectionError:
print(">>> 连接失败:请确保 Flask 服务 (127.0.0.1:5000) 已启动")
except Exception as e:
print(f">>> 发生错误: {str(e)}")
if __name__ == "__main__":
test_create_order()
# -*- coding: utf-8 -*-
# __author__ = 'holly'
# 蓝图注册
from flask import Blueprint
api = Blueprint('api', __name__)
from . import temu_api
# !/usr/bin/python
# -*- coding:utf-8 -*-
# Author: Zhichang Fu
# Created Time: 2019-03-27 07:58:11
'''
BEGIN
function:
Comment API
return:
code:0 success
END
'''
import time
import json
import requests
import logging
from datetime import datetime, timedelta
from flask import request, jsonify
from nameko.standalone.rpc import ClusterRpcProxy
from config import NAMEKO_CONFIG
from . import api
from __init__ import rpc
from util import check_customer
CONFIG = NAMEKO_CONFIG
_logger = logging.getLogger(__name__)
# from redis_conn import redis_connection
# r_conn = redis_connection()
# from services.tasks import tiktok_package_declare
# schema:
# id: data
# properties:
# verification_code:
# type: string
# description: 验证码
# example: 1234
@api.route('/temu/create/order', methods=['post'])
# @check_customer
def temu_create_order():
res = {
"success": True,
"errorCode": 0,
"errorMsg": "success",
"requestID": "202312251715021060522200417739B9",
"serverTimeMs": int(time.time() * 1000),
"result": None,
}
request_time = datetime.utcnow()
timestamp = int(time.time())
# res['ts'] = request_time.strftime("%Y-%m-%d %H:%M:%S")
res['requestID'] = request_time.strftime("%Y%m%d%H%M%S") + str(timestamp)
request_data = request.get_json()
_logger.info('temu_create_order:%s' % request_data)
result = {}
if request_data:
# print(type(request_data))
data = request_data
result['request_id'] = res['requestID']
result['data'] = data
provider_order_no = data['orderNo'] if data.get('orderNo') else ''
sequence = data['sequence'] if data.get('sequence') else ''
if not provider_order_no:
res['errorCode'] = 1007
res['errorMsg'] = 'TEMU发货单号必填'
if not sequence:
res['errorCode'] = 1007
res['errorMsg'] = '版本号必填'
if res['errorCode'] == 0:
# logging.info('推入redis')
# push_data = {'type': 'package', 'result': result}
# r_conn.lpush('tiktok_parcel_data', json.dumps(push_data))
# tiktok_package_declare.delay(**result)
return_res = rpc.temu_service.temu_create_order_service(**result)
if return_res:
if return_res['msg']:
res['success'] = False
res['errorCode'] = 1008
res['errorMsg'] = return_res['msg']
else:
res['result'] = return_res['result']
else:
res['errorCode'] = 5000
res['errorMsg'] = '参数未传'
return jsonify(res)
@api.route('/temu/update/order', methods=['post'])
# @check_customer
def temu_update_order():
# 接收提单信息、大包与小包的关联信息
res = {
"code": 0,
"msg": "success",
"requestID": "202312251715021060522200417739B9",
"ts": "2023-12-25 17:15:03"
}
request_time = datetime.utcnow()
timestamp = int(time.time())
res['ts'] = request_time.strftime("%Y-%m-%d %H:%M:%S")
res['requestID'] = request_time.strftime("%Y%m%d%H%M%S") + str(timestamp)
request_data = request.form['param_json']
_logger.info('mawb_declare kw:%s' % request_data)
if request_data:
request_data = json.loads(request_data)
request_data['request_id'] = res['requestID']
master_waybill_no = request_data['master_waybill_no'] if request_data.get('master_waybill_no') else ''
mwb_info = request_data['mwb_info'] if request_data.get('mwb_info') else ''
if not master_waybill_no:
res['code'] = 1007
res['msg'] = '提单号必填'
if not mwb_info:
res['code'] = 1007
res['msg'] = '提单信息必传'
if res['code'] == 0:
logging.info('推入redis')
push_data = {'type': 'mawb', 'result': request_data}
r_conn.lpush('tiktok_parcel_data', json.dumps(push_data))
# return_res = rpc.customs_tiktok.mawb_declare(**request_data)
# if return_res:
# if return_res['err_msg']:
# res['code'] = 5000
# res['msg'] = return_res['err_msg']
else:
res['code'] = 5000
res['msg'] = '参数未传'
return jsonify(res)
@api.route('/temu/query/order', methods=['post'])
# @check_customer
def temu_query_order():
# 接收提单的附件信息
res = {
"code": 0,
"msg": "success",
"requestID": "202312251715021060522200417739B9",
"ts": "2023-12-25 17:15:03"
}
request_time = datetime.utcnow()
timestamp = int(time.time())
res['ts'] = request_time.strftime("%Y-%m-%d %H:%M:%S")
res['requestID'] = request_time.strftime("%Y%m%d%H%M%S") + str(timestamp)
request_data = request.form['param_json']
_logger.info('mawb_copy_upload kw:%s' % request_data)
if request_data:
request_data = json.loads(request_data)
request_data['request_id'] = res['requestID']
master_waybill_no = request_data['master_waybill_no'] if request_data.get('master_waybill_no') else ''
if not master_waybill_no:
res['code'] = 1007
res['msg'] = '提单号必填'
if res['code'] == 0:
return_res = rpc.customs_tiktok.mawb_copy_upload(**request_data)
if return_res:
if return_res['msg']:
res['code'] = return_res['code']
res['msg'] = return_res['msg']
else:
res['code'] = 5000
res['msg'] = '参数未传'
return jsonify(res)
@api.route('/temu/cancel/order', methods=['post'])
# @check_customer
def temu_cancel_order():
# 接收取消订单
res = {
"success": True,
"errorCode": 0,
"errorMsg": "success",
"requestID": "202312251715021060522200417739B9",
"serverTimeMs": int(time.time() * 1000)
}
request_time = datetime.utcnow()
timestamp = int(time.time())
res['ts'] = request_time.strftime("%Y-%m-%d %H:%M:%S")
res['requestID'] = request_time.strftime("%Y%m%d%H%M%S") + str(timestamp)
request_data = request.form['param_json']
_logger.info('temu_cancel_order kw:%s' % request_data)
result = {}
if request_data:
data = json.loads(request_data)
result['request_id'] = res['requestID']
result['data'] = data
provider_order_no = data['orderNo'] if data.get('orderNo') else ''
logistics_order_no = data['logisticsOrderNo'] if data.get('logisticsOrderNo') else ''
if not provider_order_no:
res['errorCode'] = 1007
res['errorMsg'] = 'TEMU发货单号必填'
if not logistics_order_no:
res['errorCode'] = 1007
res['errorMsg'] = '服务商发货单号必填'
if res['code'] == 0:
return_res = rpc.temu_service.temu_cancel_order_service(**result)
if return_res:
if return_res['msg']:
res['errorCode'] = 1008
res['errorMsg'] = return_res['msg']
else:
res['code'] = 5000
res['msg'] = '参数未传'
return jsonify(res)
@api.route('/logistics/provider/customs/mawb_cancel', methods=['post'])
# @check_customer
def mawb_cancel():
# 接收提单取消
res = {
"code": 0,
"msg": "success",
"requestID": "202312251715021060522200417739B9",
"ts": "2023-12-25 17:15:03"
}
request_time = datetime.utcnow()
timestamp = int(time.time())
res['ts'] = request_time.strftime("%Y-%m-%d %H:%M:%S")
res['requestID'] = request_time.strftime("%Y%m%d%H%M%S") + str(timestamp)
request_data = request.form['param_json']
_logger.info('mawb_cancel kw:%s' % request_data)
if request_data:
request_data = json.loads(request_data)
request_data['request_id'] = res['requestID']
master_waybill_no = request_data['master_waybill_no'] if request_data.get('master_waybill_no') else ''
if not master_waybill_no:
res['code'] = 1007
res['msg'] = '提单号必传'
if res['code'] == 0:
return_res = rpc.customs_tiktok.mawb_cancel(**request_data)
if return_res:
if return_res['msg']:
res['code'] = return_res['code']
res['msg'] = return_res['msg']
else:
res['code'] = 5000
res['msg'] = '参数未传'
return jsonify(res)
# !/usr/bin/python
# -*- coding:utf-8 -*-
# Author: holly
# Created Time: 2019-03-26 23:15:39
"""
BEGIN
function:
config
END
"""
import os
# rabbit_ip = os.environ.get('RABBIT_IP', 'rabbit')
# # # 正式
# # NAMEKO_CONFIG = {'AMQP_URI': "amqp://hh_guest:guest@%s" % rabbit_ip}
# #
# # db_ip = os.environ.get('DB_IP', 'db')
# # db_port = "5432"
# # db_name = "hh_airorder"
# # db_user = "hh"
# # db_password = "hh123../"
# rabbit_ip = os.environ.get('RABBIT_IP', 'rabbit')
# # 测试环境
# NAMEKO_CONFIG = {'AMQP_URI': "amqp://guest:guest@%s" % rabbit_ip}
#
# db_ip = 'db'
# db_port = "5432"
# db_name = "airorder_tiktok"
# db_user = "khg"
# db_password = "khg"
# 本地
NAMEKO_CONFIG = {'AMQP_URI': "amqp://guest:guest@localhost"}
# 数据库配置
db_ip = "127.0.0.1"
db_port = "5431"
db_name = "test_xqhwaybill"
db_user = "odoo14"
db_password = "qq166349"
# swagger配置
title = 'Temu API'
description = ''
#
# odoo_url = "https://otm.f.yizuo.ltd"
redis_options = dict(
host='127.0.0.1',
port=6379,
# password='topodoo1314',
decode_responses=True,
db=0
)
import config
import psycopg2
from sqlalchemy import create_engine
class DbService(object):
def __init__(self):
print('new connection')
self.conn_engine = create_engine('postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}'.format(
username=config.db_user, password=config.db_password, host=config.db_ip, port=config.db_port,
database=config.db_name))
self.pg_conn = psycopg2.connect(database=config.db_name, user=config.db_user,
password=config.db_password, host=config.db_ip, port=config.db_port)
# -*- coding:utf-8 -*-
import argparse
from flasgger import Swagger
from flask_cors import CORS
from __init__ import create_app
import config
parser = argparse.ArgumentParser()
parser.add_argument("--port", help="app running port", type=int, default=5000)
parse_args = parser.parse_args()
app = create_app()
swagger_config = Swagger.DEFAULT_CONFIG
swagger_config['title'] = config.title
swagger_config['description'] = config.description
Swagger(app)
CORS(app, supports_credentials=True)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=int(parse_args.port), debug=True)
# -*- coding: utf-8 -*-
import config
import redis
import logging
import psycopg2
_logger = logging.getLogger(__name__)
def redis_connection():
# 连接redis
redis_config = config.redis_options
if redis_config:
try:
redis_options = dict(
host=redis_config.get('host'),
port=redis_config.get('port'),
# password=redis_config.get('password'),
decode_responses=True,
db=redis_config.get('db'),
)
# print(redis_options)
pool = redis.ConnectionPool(**redis_options)
r = redis.Redis(connection_pool=pool)
return r
except Exception as e:
_logger.error(u'连接redis失败,原因:%s' % str(e))
return ''
else:
_logger.error(u'conf文件中未配置redis连接信息')
return
pytest-runner
flask
flasgger
nameko
flask_cors
psycopg2
sqlalchemy
pandas
PyJWT
import time
import hashlib
import base64
import json
from datetime import timedelta, datetime
import jwt
from db_service import DbService
import functools
import logging
import pandas as pd
from flask import request, jsonify
from config import NAMEKO_CONFIG
TIMEOUT_TIME = 1000
db_handle = DbService().conn_engine
_logger = logging.getLogger(__name__)
def check_customer(func):
@functools.wraps(func)
def wrapper(*args, **kw):
res = {
"code": 0,
"msg": "success",
"requestID": "202312251715021060522200417739B9",
"ts": "2023-12-25 17:15:03"
}
request_time = datetime.utcnow()
current_timestamp = int(time.time())
res['ts'] = request_time.strftime("%Y-%m-%d %H:%M:%S")
res['requestID'] = request_time.strftime("%Y%m%d%H%M%S") + str(current_timestamp)
# 获取传输的值
sign = kw['sign'] if kw.get('sign') else ""
param_json_str = kw['param_json'] if kw.get('param_json') else "{}"
timestamp = kw['timestamp'] if kw.get('timestamp') else ""
version = kw['version'] if kw.get('version') else ""
app_key = kw['app_key'] if kw.get('app_key') else ""
app_secret = request.env["ir.config_parameter"].sudo().get_param('tt_app_secret') or ''
_logger.info('request_data:%s' % kw)
if kw.get('param_json'):
# param_json = json.loads(kw['param_json'])
# param_json_str = json.dumps(param_json, ensure_ascii=False)
check_sign = request.env["ao.tt.api"].sudo().make_sign(app_key, app_secret, version,
timestamp, param_json_str)
# print(check_sign)
if sign != check_sign:
res['code'] = 1004
res['msg'] = '验证失败'
# 检查时间的有效性
if res['code'] == 0:
# try:
millis = time.time()
_logger.info(u'时间戳:%s' % millis)
now = datetime.now()
timestamp1 = int(kw['timestamp'])
_logger.info(u'时间戳:%s' % timestamp1)
datetime_timestamp = datetime.fromtimestamp(timestamp1)
_logger.info(u'datetime_timestamp:%s' % datetime_timestamp)
_logger.info(u'now:%s' % now)
if now > datetime_timestamp:
difference = (now - datetime_timestamp).seconds
else:
difference = 0
_logger.info(u'时间差:%s' % difference)
if difference > TIMEOUT_TIME:
res['code'] = 1005
res['msg'] = '请求过期'
_logger.warning(u'时间戳已过期')
# _logger.info('response:%s' % rets['response'])
if res['code'] == 0:
return func(*args, **kw)
else:
# return func(*args, **kw)
return json.JSONEncoder().encode(res)
return wrapper
# jwt auth方式生成token
def get_token(username):
payload = {
'exp': datetime.datetime.now() + timedelta(hours=24), # 令牌过期时间
'username': str(username) # 想要传递的信息,如用户名ID
}
key = 'yunhangji'
encoded_jwt = jwt.encode(payload, key, algorithm='HS256')
return encoded_jwt
# token解码 {'exp': 1603984192, 'username': 'BigFish'}
def check_token(func):
@functools.wraps(func)
def wrapper(*args, **kw):
rets = {
"status": "1",
'error_message': ''
}
request_headers = request.headers
logging.info('check_token_request_method:%s' % request.method)
logging.info('check_token_request_headers:%s' % request_headers)
if request_headers.get('token'):
# 检查是否存在这个客户
logging.info('token: %s ' % request_headers['token'])
try:
res = jwt.decode(request_headers['token'], 'yunhangji', algorithms=['HS256'])
logging.info('check_token_res:%s' % res)
if request.method != 'GET' and request.path != '/v1/servicer/info/update':
request_data = request.json
else:
request_data = request.args
logging.info('check_token_request_data:%s' % request_data)
if request_data.get('servicer_id'):
if res['username'] != str(request_data['servicer_id']):
rets['status'] = 103
rets['error_message'] = 'token不属于当前用户'
else:
rets['status'] = 105
rets['error_message'] = '请传入参数名为servicer_id的服务商id'
except Exception as ex:
rets['status'] = 102
rets['error_message'] = str(ex)
else:
rets['status'] = 101
rets['error_message'] = u'请在请求头中传入token参数.'
logging.warning(u'请传入token参数')
logging.info('status:%s' % rets['status'])
if rets['status'] == "1":
return func(*args, **kw)
else:
return rets
return wrapper
{
"sign": "2DE2BA70E44A1F64855FE5B4D404EC13",
"orderNo": "2343155111",
"channelCode": "channelCode",
"warehouse": "warehouse",
"sequence": 9,
"deliverMethod": 1,
"deliveryMode": 1,
"electrified": 0,
"exportLicenseSource": 1,
"companyInfo": {
"companyName": "gsmccs",
"phone": "111",
"email": "111@163.com"
},
"shippingInfo": {
"fullName": "发仓联系人",
"regionName1": "CN",
"regionName2": "上海市",
"regionName3": "上海市",
"regionName4": "长宁区",
"detailedAddress": "上海长宁大厦",
"email": "111@163.com",
"phone": "111"
},
"destinationInfo": {
"fullName": "海外仓联系人",
"regionName1": "US",
"regionName2": "California",
"regionName3": "Acton",
"regionName4": "",
"detailedAddress": "test1",
"postcode": "111",
"email": "111@163.com",
"phone": "111",
"warehouseNo": "12689079385"
},
"cartonInfo": [
{
"cartonNo": "2125",
"weight": 2,
"weightUnit": "KG",
"length": 100,
"width": 100,
"height": 100,
"lengthUnit": "CM",
"electrified": 0,
"skuInfo": [
{
"id": "111",
"quantity": 1
}
]
}
],
"containerInfo": [
{
"containerType": "abc",
"containerCnt": 1,
"containerWeight": 123.4,
"containerWeightUnit": "kg",
"containerVolume": 432.1,
"containerVolumeUnit": "CBM"
}
],
"interface_type": "bg.intligst.semihead.create",
"scene_id": 7017820,
"source_app_key": "f219476226359894f8f3a2b70568e1cf",
"target_app_key": "demo-e1a4a8688f0678ef3de8e2b4877"
}
\ No newline at end of file
from .redis_service import RedisService
# from .db_service import DbService
from .db_service import db_handle
# !/usr/bin/python
# -*- coding:utf-8 -*-
# Author: holly
# Created Time: 2021-08-02
"""
BEGIN
function:
config
END
"""
import os
# 生产环境
# redis配置
# REDIS_NAME = "nameko-redis"
# REDIS_HOST = os.environ.get("REDIS_HOST", "some-redis")
# REDIS_PORT = int(os.environ.get("REDIS_PORT", 32768))
# REDIS_DB = int(os.environ.get("REDIS_DB", 1))
# postgresql数据库配置
db_ip = '127.0.0.1'
db_port = "5431"
db_name = "test_xqhwaybill"
db_user = "odoo14"
db_password = "qq166349"
# odoorpc配置
rpc_db_ip = '127.0.0.1'
rpc_db_port = "8069"
rpc_db_name = "hh_ccs_425"
rpc_db_user = "admin"
rpc_db_password = "admin"
# 测试环境
# redis配置
# REDIS_NAME = "nameko-redis"
# REDIS_HOST = os.environ.get("REDIS_HOST", "redis")
# REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
# REDIS_DB = int(os.environ.get("REDIS_DB", 9))
# postgresql数据库配置
# db_ip = os.environ.get('DB_IP', 'db')
# db_port = "5432"
# db_name = "airorder_tiktok"
# db_user = "khg"
# db_password = "khg"
# 本地
# redis配置
REDIS_NAME = "nameko-redis"
REDIS_HOST = os.environ.get("REDIS_HOST", "127.0.0.1")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
REDIS_DB = int(os.environ.get("REDIS_DB", 0))
#
# # postgresql数据库配置
# db_ip = "127.0.0.1"
# db_port = "5432"
# db_name = "yhj"
# db_user = "odoo"
# db_password = "odoo"
redis_options = dict(
host='127.0.0.1',
port=6379,
# password='topodoo1314',
decode_responses=True,
db=0
)
# !/usr/bin/python
# -*- coding:utf-8 -*-
# Author: Zhichang Fu
# Created Time: 2019-03-26 23:14:11
'''
BEGIN
function:
db Service
return:
code:0 success
END
'''
# import json
# from . import config
# import psycopg2
# from sqlalchemy import create_engine
#
#
# class DbService(object):
# def __init__(self):
# print('new connection')
# self.conn_engine = create_engine('postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}'.format(
# username=config.db_user, password=config.db_password, host=config.db_ip, port=config.db_port,
# database=config.db_name), pool_size=40)
#
# self.pg_conn = psycopg2.connect(database=config.db_name, user=config.db_user,
# password=config.db_password, host=config.db_ip, port=config.db_port)
#
#
from sqlalchemy import create_engine
from . import config # 假设你有这个配置文
from contextlib import contextmanager
class DbService(object):
def __init__(self):
# 创建连接池 (SQLAlchemy 默认包含 QueuePool)
# pool_size=40: 保持40个连接
# max_overflow=10: 负载高时允许额外再创10个
self.conn_engine = create_engine(
'postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}'.format(
username=config.db_user, password=config.db_password,
host=config.db_ip, port=config.db_port, database=config.db_name
),
pool_size=15,
max_overflow=5,
pool_recycle=3600 # 1小时回收连接,防止 MySQL/PG 断连
)
@contextmanager
def get_cursor(self):
"""
上下文管理器:自动获取连接、创建游标、提交事务、关闭连接
"""
# 从连接池获取一个原生 psycopg2 连接
conn = self.conn_engine.raw_connection()
cursor = conn.cursor()
try:
yield cursor
conn.commit() # 成功则提交
except Exception as e:
conn.rollback() # 出错回滚
raise e
finally:
cursor.close()
conn.close() # 归还连接给连接池 (不是真的关闭 TCP)
# 初始化全局 DB 实例
db_handle = DbService()
# !/usr/bin/python
# -*- coding:utf-8 -*-
# Author: Zhichang Fu
# Created Time: 2019-03-26 23:14:11
'''
BEGIN
function:
Redis Service
return:
code:0 success
END
'''
import redis
import json
from . import config
class RedisClient(object):
"""
redis client
"""
redis_client = {}
@staticmethod
def reload_redis(host, port, select_db):
"""
function: reload redis object
"""
return redis.StrictRedis(
host=host,
port=port,
db=select_db,
password="",
decode_responses=True)
@classmethod
def get_redis(cls, redis_name, host, port, select_db):
"""
function: get redis client
"""
if redis_name not in cls.redis_client:
cls.redis_client[redis_name] = cls.reload_redis(
host, port, select_db)
return cls.redis_client.get(redis_name)
class RedisService(object):
def __init__(self):
self.redis_instance = RedisClient.get_redis(
config.REDIS_NAME, config.REDIS_HOST, config.REDIS_PORT,
config.REDIS_DB)
self.users_key = "users"
self.users_data_key = "users_data"
def check_registered_and_get_info(self, u_id):
"""
Check if the user is registered and return user information \
if registered.
"""
user_data = self.redis_instance.hget(self.users_data_key, u_id)
if not user_data:
return False, None
return True, json.loads(user_data)
def check_email_is_registered(self, email):
u_id = self.redis_instance.hget(self.users_key, email)
return u_id
def register(self, u_id, email, data):
self.redis_instance.hset(self.users_key, email, u_id)
result = self.redis_instance.hset(self.users_data_key, u_id,
json.dumps(data))
return result
\ No newline at end of file
# !/usr/bin/python
# -*- coding:utf-8 -*-
import datetime
from random import Random
import pytz
from . import config
import odoorpc
week_day_dict = {
0: 'Mon',
1: 'Tue',
2: 'Wed',
3: 'Thu',
4: 'Fri',
5: 'Sat',
6: 'Sun',
}
class YhjCommon(object):
"""
common
"""
@classmethod
def get_location_time(cls):
"""
获取当前时区时间(带时区)
:return:
"""
now_time = datetime.datetime.utcnow()
return now_time
@classmethod
def get_add_time(cls, parse_time):
"""
把时间增加8小时
:return:
"""
dt = datetime.datetime.strptime(parse_time, "%Y-%m-%d %H:%M:%S")
d = dt + datetime.timedelta(hours=8)
nTime = d.strftime("%Y-%m-%d %H:%M:%S")
return nTime
@classmethod
def get_item_time(cls, parse_time, days, way):
"""
时间增加还是减少指定天数
:return:
"""
dt = datetime.datetime.strptime(parse_time.strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
if way == 'add':
d = dt + datetime.timedelta(days=days)
else:
d = dt - datetime.timedelta(days=days)
expire_date = d.strftime("%Y-%m-%d %H:%M:%S")
return expire_date
@classmethod
def get_time_zone_week(cls, parse_time, hour='', time_zone=''):
"""
根据时区获取时间,转化为轨迹所需的格式
:return:
"""
hours = hour
if '+' in hours:
hours = hours.replace('+', '')
if '-' in hours:
hours = -int(hours.replace('-', ''))
dt = datetime.datetime.strptime(parse_time.strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
if hours:
d = dt + datetime.timedelta(hours=int(hours))
else:
d = dt
day = d.weekday()
week = week_day_dict[day]
nTime = d.strftime("%m-%d %H:%M")
Time = '%s,' % week + nTime + time_zone
return Time
@classmethod
def get_time_zone_current(cls, parse_time, hour='', is_second=True):
"""
根据时区获取时间,转化为轨迹所需的格式
:return:
"""
hours = hour
if '+' in hours:
hours = hours.replace('+', '')
if '-' in hours:
hours = -int(hours.replace('-', ''))
dt = datetime.datetime.strptime(parse_time.strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
if hours:
d = dt + datetime.timedelta(hours=int(hours))
else:
d = dt
if is_second:
nTime = d.strftime("%Y-%m-%d %H:%M:%S")
else:
nTime = d.strftime("%Y-%m-%d %H:%M")
Time = nTime + '(%s)' % hour
return Time
@classmethod
def get_time_zone_current_str(cls, parse_time, hour=''):
"""
根据0时区时间和时区获取当地时间
:return:
"""
hours = hour
if '+' in hours:
hours = hours.replace('+', '')
if '-' in hours:
hours = -int(hours.replace('-', ''))
dt = datetime.datetime.strptime(parse_time, "%Y-%m-%d %H:%M:%S")
if hours:
d = dt + datetime.timedelta(hours=int(hours))
else:
d = dt
nTime = d.strftime("%Y-%m-%d %H:%M:%S")
Time = nTime + '(%s)' % hour
return Time
@classmethod
def get_time(cls, parse_time, way, hour=0):
"""
时间增加或者减少小时
:return:
"""
hours = hour
dt = datetime.datetime.strptime(parse_time.strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
if way == 'add':
d = dt + datetime.timedelta(hours=hours)
else:
d = dt - datetime.timedelta(hours=hours)
expire_date = d.strftime("%Y-%m-%d %H:%M:%S")
return expire_date
@classmethod
def get_utc_time(cls, parse_time, time_zone):
"""
根据时区 时间获取零时区的时间
:return:
"""
if not time_zone:
time_zone = ''
if '-' in time_zone:
time_zone = time_zone.replace('-', '')
if '+' in time_zone:
time_zone = -int(time_zone.replace('+', ''))
dt = datetime.datetime.strptime(parse_time, "%Y-%m-%d %H:%M:%S")
if time_zone:
d = dt + datetime.timedelta(hours=int(time_zone))
nTime = d.strftime("%Y-%m-%d %H:%M:%S")
else:
nTime = dt.strftime("%Y-%m-%d %H:%M:%S")
return nTime
@classmethod
def get_time_zone(cls, parse_time, time_zone):
"""
根据零时区时间转化成当前时区的时间
:return:
"""
if not time_zone:
time_zone = ''
if '+' in time_zone:
time_zone = time_zone.replace('+', '')
if '-' in time_zone:
time_zone = -int(time_zone.replace('-', ''))
dt = datetime.datetime.strptime(parse_time.strftime("%Y-%m-%d %H:%M:%S"), "%Y-%m-%d %H:%M:%S")
if time_zone:
d = dt + datetime.timedelta(hours=int(time_zone))
nTime = d.strftime("%Y-%m-%d %H:%M:%S")
else:
nTime = dt.strftime("%Y-%m-%d %H:%M:%S")
return nTime
@classmethod
def random_str(cls, randomlength=8):
str = ''
chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
length = len(chars) - 1
random = Random()
for i in range(randomlength):
str += chars[random.randint(0, length)]
return str
@classmethod
def state_seq(cls, state):
state_list = ['已下单', '客户发货', '仓库收货', '货物安检', '货物入库', '发出', '到达', '尾程入仓', '卡运揽收', '卡运签收', '提货', '完成', '取消']
if state in state_list:
return state_list.index(state)
else:
return 14
@classmethod
def get_time_str(self, parse_time):
"""
把时间减去8小时
:return:
"""
dt = datetime.datetime.strptime(parse_time, "%Y-%m-%d %H:%M:%S")
nTime = dt.strftime("%Y-%m-%d %H:%M:%S")
return nTime
# class Order_dispose(object):
#
# def __init__(self):
# # rpc连接
# self.odoo_db = odoorpc.ODOO(config.rpc_db_ip, port=config.rpc_db_port)
# self.odoo_db.login(config.rpc_db_name, config.rpc_db_user, config.rpc_db_password)
\ No newline at end of file
# foobar.yaml
# 本地/测试环境
#AMQP_URI: pyamqp://guest:guest@${RABBIT_IP:rabbit}
# 生产环境
AMQP_URI: pyamqp://hh_guest:guest@${RABBIT_IP:rabbit}
#WEB_SERVER_ADDRESS: '0.0.0.0:8000'
#rpc_exchange: 'nameko-rpc'
#max_workers: 10
#parent_calls_tracked: 10
LOGGING:
version: 1
handlers:
console:
class: logging.StreamHandler
# root:
# level: DEBUG
# handlers: [console]
\ No newline at end of file
2026-02-05 17:06:22,165 INFO: starting services: temu_service [in D:\Python\Python39\lib\site-packages\nameko\runners.py:64]
2026-02-05 17:06:22,173 INFO: Connected to amqp://guest:**@127.0.0.1:5672// [in D:\Python\Python39\lib\site-packages\kombu\mixins.py:228]
2026-02-05 17:17:12,625 ERROR: temu_create_order_service Error: 2343155111 [in D:\Documents\xqh_temu_api\services\.\temu_service.py:245]
Traceback (most recent call last):
File "D:\Documents\xqh_temu_api\services\.\temu_service.py", line 75, in temu_create_order_service
cr.execute("""
psycopg2.errors.UndefinedTable: 错误: 关系 "temu_order" 不存在
LINE 3: FROM temu_order
^
2026-02-05 17:17:40,506 INFO: stopping services: temu_service [in D:\Python\Python39\lib\site-packages\nameko\runners.py:75]
2026-02-05 17:17:49,028 INFO: starting services: temu_service [in D:\Python\Python39\lib\site-packages\nameko\runners.py:64]
2026-02-05 17:17:49,037 INFO: Connected to amqp://guest:**@127.0.0.1:5672// [in D:\Python\Python39\lib\site-packages\kombu\mixins.py:228]
2026-02-05 17:44:15,863 INFO: stopping services: temu_service [in D:\Python\Python39\lib\site-packages\nameko\runners.py:75]
2026-02-05 17:44:17,851 INFO: killing services: temu_service [in D:\Python\Python39\lib\site-packages\nameko\runners.py:86]
2026-02-05 17:44:17,852 INFO: killing <ServiceContainer [temu_service] at 0x20049e82a60> [in D:\Python\Python39\lib\site-packages\nameko\containers.py:284]
This source diff could not be displayed because it is too large. You can view the blob instead.
redis
nameko
pandas==1.1.1
pytest
sqlalchemy==1.4.35
psycopg2-binary
apscheduler
requests
psycopg2
wechatpy
pycryptodome
odoorpc
# # !/usr/bin/python
# # -*- coding:utf-8 -*-
# # Author: Zhichang Fu
# # Created Time: 2019-03-26 23:11:14
# '''
# BEGIN
# function:
# Hello Service
# END
# '''
# import ast
# import copy
# import json
# import sys
# import time
# import requests
# import logging
# import hashlib
# import uuid
# from random import Random
# import pandas as pd
# from nameko.rpc import rpc, RpcProxy
# from datetime import datetime, timedelta, timezone
# from dependence.services import config
# from dependence.services import db_handle
# from dependence.services.util import YhjCommon, Order_dispose
# from dependence.services.redis_service import RedisClient
# from logging.handlers import RotatingFileHandler
# # from line_profiler import LineProfiler
# # # 创建一个 LineProfiler 实例
# # profiler = LineProfiler()
# sys.path.append("..")
#
#
# logging.basicConfig(level=logging.INFO) # 调试debug级
# # 创建日志记录器,指明日志保存的路径、每个日志文件的最大大小、保存的日志文件个数上限
# file_log_handler = RotatingFileHandler("logs/temu_service.log", maxBytes=1024 * 1024 * 100, backupCount=15, encoding='utf-8')
# # 创建日志记录的格式 日志等级 输入日志信息的文件名 行数 日志信息
# formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]')
# # 为刚创建的日志记录器设置日志记录格式
# file_log_handler.setFormatter(formatter)
# # 为全局的日志工具对象(flask app使用的)添加日志记录器
# logging.getLogger().addHandler(file_log_handler)
# # logging.basicConfig(handlers=[logging.FileHandler('logs/tiktok_test.py.log', 'a', 'utf-8')],
# # format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
#
# _logger = logging.getLogger(__name__)
#
#
# # 只会运行的时候连接一次
# # db_handle = DbService().conn_engine
# odoo_conn = Order_dispose()
# common = YhjCommon()
# redis_client = RedisClient()
# redis_obj = redis_client.get_redis(config.REDIS_NAME, config.REDIS_HOST, config.REDIS_PORT, config.REDIS_DB)
#
#
# class TemuService(object):
#
# name = "temu_service"
#
# @rpc
# def temu_create_order_service(self, kw_data):
# return_res = {"msg": '', "result": {}}
# # 原始数据
# kw = kw_data.get('data', {})
# order_no = kw.get('orderNo')
#
# try:
# # 使用上下文管理器获取游标 (自动处理事务)
# with db_handle.get_cursor() as cr:
# # -------------------------------------------------------
# # 1. 检查订单状态 & 锁行 (FOR UPDATE)
# # -------------------------------------------------------
# cr.execute("""
# SELECT id, version_no, logistics_order_no, delivery_mode, deliver_method
# FROM temu_order
# WHERE temu_delivery_no = %s
# FOR UPDATE
# """, (order_no,))
# existing = cr.fetchone()
# order_id = None
# logistics_order_no = None
# incoming_seq = int(kw.get('sequence', 0))
# if existing:
# order_id, current_seq, logistics_order_no, d_mode, d_method = existing
#
# # === 场景 A: 版本过低 ===
# if incoming_seq < (current_seq or 0):
# return_res['msg'] = '下单失败:版本号低于当前系统版本'
# return_res['result'] = {'orderNo': order_no, 'logisticsOrderNo': logistics_order_no}
# self._log_api(cr, kw_data, order_no, return_res['msg'])
# return return_res
#
# # === 场景 B: 版本一致 (直接返回) ===
# elif incoming_seq == (current_seq or 0):
# # 如果需要返回箱号,查一下
# carton_res = []
# if str(d_mode) != '1' or str(d_method) != '3':
# cr.execute("SELECT carton_no, service_carton_no FROM temu_order_carton WHERE order_id=%s", (order_id,))
# carton_res = [{'cartonNo': row[0], 'logisticsCartonNo': row[1]} for row in cr.fetchall()]
# return_res['result'] = {
# 'orderNo': order_no,
# 'logisticsOrderNo': logistics_order_no,
# 'cartonInfo': carton_res
# }
# return return_res
# # === 场景 C: 更新模式 ===
# else:
# # 原生 SQL 更新主表
# self._update_order(cr, order_id, kw)
# # 清理旧数据 (全删全插模式)
# cr.execute("DELETE FROM temu_order_sku WHERE order_id=%s", (order_id,))
# cr.execute("DELETE FROM temu_order_carton WHERE order_id=%s", (order_id,))
# cr.execute("DELETE FROM temu_order_container WHERE order_id=%s", (order_id,))
#
# else:
# # === 场景 D: 创建模式 ===
# order_id, logistics_order_no = self._insert_order(cr, kw)
#
# # -------------------------------------------------------
# # 2. 批量处理子表 (复用于 创建/更新)
# # -------------------------------------------------------
#
# # --- A. 处理柜信息 (Container) ---
# containers = kw.get('containerInfo', [])
# if containers:
# c_data = [(
# order_id, c.get('containerType'), c.get('containerWeight'), c.get('containerWeightUnit'),
# c.get('containerVolume'), c.get('containerVolumeUnit')
# ) for c in containers]
#
# cr.executemany("""
# INSERT INTO temu_order_container
# (order_id, container_type, container_weight, container_weight_unit, container_volume, container_volume_unit)
# VALUES (%s, %s, %s, %s, %s, %s)
# """, c_data)
#
# # --- B. 处理大箱与SKU (Carton & SKU) ---
# cartons = kw.get('cartonInfo', [])
# carton_insert_values = []
# # 用于暂存 SKU 数据,结构:{'cartonNo': [sku1, sku2...]}
# sku_temp_holder = {}
#
# for c in cartons:
# # 收集大箱参数
# carton_insert_values.append((
# order_id, c['cartonNo'], c.get('weight'), c.get('weightUnit'),
# c.get('length'), c.get('width'), c.get('height'),
# c.get('lengthUnit'), c.get('electrified')
# ))
#
# # 暂存 SKU 数据,后续拿到 ID 后再处理
# if c.get('skuInfo'):
# sku_temp_holder[c['cartonNo']] = c['skuInfo']
#
# # ---------------------------------------------------------------
# # 2. 批量插入大箱 (1次数据库交互)
# # ---------------------------------------------------------------
# if carton_insert_values:
# # 注意:executemany 不支持 RETURNING 返回结果集(在某些驱动下),
# # 所以我们只负责插入,不在这里获取 ID
# carton_sql = """
# INSERT INTO temu_order_carton (
# order_id, carton_no, weight, weight_unit, length, width, height,
# length_unit, is_electrified, create_date
# ) VALUES (
# %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()
# )
# """
# cr.executemany(carton_sql, carton_insert_values)
#
# # ---------------------------------------------------------------
# # 3. 批量回查 ID 并建立映射 (1次数据库交互)
# # ---------------------------------------------------------------
# # 核心技巧:利用 order_id 把刚才插入的所有箱子一次性查出来
# # 假设 carton_no 在一个订单内是唯一的,我们可以用它做 Key
# select_sql = """
# SELECT carton_no, id, service_carton_no
# FROM temu_order_carton
# WHERE order_id = %s
# """
# cr.execute(select_sql, (order_id,))
#
# # 建立映射字典: {'carton_no': db_id}
# # 同时也收集需要返回给 API 的结果
# carton_no_to_id_map = {}
# response_cartons = []
#
# for row in cr.fetchall():
# c_no, db_id, svc_c_no = row
# carton_no_to_id_map[c_no] = db_id
#
# # 收集返回给 Temu 的数据结构
# response_cartons.append({
# 'cartonNo': c_no,
# 'logisticsCartonNo': svc_c_no
# })
#
# # ---------------------------------------------------------------
# # 4. 准备 SKU 的批量插入数据
# # ---------------------------------------------------------------
# sku_insert_values = []
#
# for c_no, sku_list in sku_temp_holder.items():
# # 通过映射找到数据库 ID
# db_carton_id = carton_no_to_id_map.get(c_no)
#
# if db_carton_id:
# for sku in sku_list:
# sku_insert_values.append((
# order_id,
# db_carton_id,
# sku['id'],
# sku['quantity']
# ))
#
# # ---------------------------------------------------------------
# # 5. 批量插入 SKU (1次数据库交互)
# # ---------------------------------------------------------------
# if sku_insert_values:
# sku_sql = """
# INSERT INTO temu_order_sku (order_id, carton_id, sku_id, quantity)
# VALUES (%s, %s, %s, %s)
# """
# cr.executemany(sku_sql, sku_insert_values)
# # -------------------------------------------------------
# # 3. 构造返回
# # -------------------------------------------------------
# final_cartons = []
# # 逻辑判断:是否需要返回箱号
# if str(kw.get('deliverMethod')) != '3' or str(kw.get('deliveryMode')) != '1':
# final_cartons = response_cartons
#
# return_res['result'] = {
# 'orderNo': order_no,
# 'logisticsOrderNo': logistics_order_no,
# 'cartonInfo': final_cartons
# }
# # 记录日志
# self._log_api(cr, kw_data, order_no, "")
#
# except Exception as e:
# _logger.exception(f"Order Processing Error: {order_no}")
# return_res['msg'] = f"系统内部错误: {str(e)}"
# return_res['result'] = {'orderNo': order_no}
# # 这里的 rollback 由 DbService 的 contextmanager 自动处理
# return return_res
#
# # --- 辅助 SQL 方法 ---
#
# def _insert_order(self, cr, kw):
# """ 插入主表并返回 ID 和 生成的单号 """
# sql = """
# INSERT INTO temu_order (
# temu_delivery_no, channel_code, warehouse_name, version_no, deliver_method,
# delivery_mode, is_electrified, predict_charge, predict_charge_currency, export_license_source,
# company_name, company_contact_name, company_phone, company_email,
# ship_contact_name, ship_region1, ship_region2, ship_region3, ship_region4,
# ship_detail_address, ship_email, ship_phone, ship_postcode,
# dest_contact_name, dest_region1, dest_region2, dest_region3, dest_region4,
# dest_detail_address, dest_email, dest_phone, dest_postcode, dest_warehouse_no,
# create_date, write_date, state
# ) VALUES (
# %s, %s, %s, %s, %s,
# %s, %s, %s, %s, %s,
# %s, %s, %s, %s,
# %s, %s, %s, %s, %s,
# %s, %s, %s, %s,
# %s, %s, %s, %s, %s,
# %s, %s, %s, %s, %s,
# NOW(), NOW(), 'progress'
# ) RETURNING id, logistics_order_no
# """
# params = (
# kw.get('orderNo'), kw.get('channelCode'), kw.get('warehouse'), kw.get('sequence'), kw.get('deliverMethod'),
# kw.get('deliveryMode'), kw.get('electrified'), kw.get('predictCharge'), kw.get('predictChargeCurrency'), kw.get('exportLicenseSource'),
# kw.get('companyInfo', {}).get('companyName'), kw.get('companyInfo', {}).get('fullName'), kw.get('companyInfo', {}).get('phone'), kw.get('companyInfo', {}).get('email'),
# kw.get('shippingInfo', {}).get('fullName'), kw.get('shippingInfo', {}).get('regionName1'), kw.get('shippingInfo', {}).get('regionName2'), kw.get('shippingInfo', {}).get('regionName3'), kw.get('shippingInfo', {}).get('regionName4'),
# kw.get('shippingInfo', {}).get('detailedAddress'), kw.get('shippingInfo', {}).get('email'), kw.get('shippingInfo', {}).get('phone'), kw.get('shippingInfo', {}).get('postcode'),
# kw.get('destinationInfo', {}).get('fullName'), kw.get('destinationInfo', {}).get('regionName1'), kw.get('destinationInfo', {}).get('regionName2'), kw.get('destinationInfo', {}).get('regionName3'), kw.get('destinationInfo', {}).get('regionName4'),
# kw.get('destinationInfo', {}).get('detailedAddress'), kw.get('destinationInfo', {}).get('email'), kw.get('destinationInfo', {}).get('phone'), kw.get('destinationInfo', {}).get('postcode'), kw.get('destinationInfo', {}).get('warehouseNo')
# )
# cr.execute(sql, params)
# return cr.fetchone()
#
# def _update_order(self, cr, order_id, kw):
# sql = """
# UPDATE temu_order SET
# channel_code=%s, warehouse_name=%s, version_no=%s, deliver_method=%s,
# delivery_mode=%s, is_electrified=%s, predict_charge=%s, predict_charge_currency=%s,
# export_license_source=%s,
# company_name=%s, company_contact_name=%s, company_phone=%s, company_email=%s,
# ship_contact_name=%s, ship_region1=%s, ship_region2=%s, ship_region3=%s, ship_region4=%s,
# ship_detail_address=%s, ship_email=%s, ship_phone=%s, ship_postcode=%s,
# dest_contact_name=%s, dest_region1=%s, dest_region2=%s, dest_region3=%s, dest_region4=%s,
# dest_detail_address=%s, dest_email=%s, dest_phone=%s, dest_postcode=%s, dest_warehouse_no=%s,
# write_date=NOW()
# WHERE id=%s
# """
# params = (
# kw.get('orderNo'), kw.get('channelCode'), kw.get('warehouse'), kw.get('sequence'), kw.get('deliverMethod'),
# kw.get('deliveryMode'), kw.get('electrified'), kw.get('predictCharge'), kw.get('predictChargeCurrency'), kw.get('exportLicenseSource'),
# kw.get('companyInfo', {}).get('companyName'), kw.get('companyInfo', {}).get('fullName'), kw.get('companyInfo', {}).get('phone'), kw.get('companyInfo', {}).get('email'),
# kw.get('shippingInfo', {}).get('fullName'), kw.get('shippingInfo', {}).get('regionName1'), kw.get('shippingInfo', {}).get('regionName2'), kw.get('shippingInfo', {}).get('regionName3'), kw.get('shippingInfo', {}).get('regionName4'),
# kw.get('shippingInfo', {}).get('detailedAddress'), kw.get('shippingInfo', {}).get('email'), kw.get('shippingInfo', {}).get('phone'), kw.get('shippingInfo', {}).get('postcode'),
# kw.get('destinationInfo', {}).get('fullName'), kw.get('destinationInfo', {}).get('regionName1'), kw.get('destinationInfo', {}).get('regionName2'), kw.get('destinationInfo', {}).get('regionName3'), kw.get('destinationInfo', {}).get('regionName4'),
# kw.get('destinationInfo', {}).get('detailedAddress'), kw.get('destinationInfo', {}).get('email'), kw.get('destinationInfo', {}).get('phone'), kw.get('destinationInfo', {}).get('postcode'), kw.get('destinationInfo', {}).get('warehouseNo'),
# order_id
# )
# cr.execute(sql, params)
#
# def _log_api(self, cr, kw_data, order_no, error_msg):
# try:
# cr.execute("""
# INSERT INTO temu_api_log (big_bag_no, error_msg, push_time, data_text, request_id, source)
# VALUES (%s, %s, NOW(), %s, %s, '推入')
# """, (
# order_no, error_msg,
# json.dumps(kw_data.get('data', {})),
# kw_data.get('request_id')
# ))
# except Exception:
# pass # 日志错误不影响主流程
#
# def get_rf_to_time(self, rfc3339_string):
# # 将RFC 3339时间字符串解析为datetime对象
# dt = datetime.fromisoformat(rfc3339_string.replace('Z', '+00:00'))
# # 如果需要将其转换为UTC时区,可以显式设置时区
# dt_utc = dt.replace(tzinfo=timezone.utc)
# # 输出结果
# dt_utc = dt_utc.strftime("%Y-%m-%d %H:%M:%S")
# print("Parsed datetime (UTC):", dt_utc)
# return dt_utc
#
# @rpc
# def temu_cancel_order_service(self, **kws):
# # 接收取消订单
# return_res = {
# "msg": ''
# }
# order_no = ''
# kw = {}
# utc_time = ''
# try:
# kw = kws['data']
# order_no = kw['orderNo']
# logistics_order_no = kw['logisticsOrderNo']
# sql = "select id from temu_order where temu_delivery_no=%s and logistics_order_no=%s"
# temu_order_id = ''
# temu_order_sql_result = pd.read_sql(sql, con=db_handle, params=(order_no, logistics_order_no))
# utc_time = datetime.utcnow()
# for temu_res in temu_order_sql_result.itertuples():
# temu_order_id = temu_res.id
# if temu_order_id:
# update_cancel_sql = "update temu_order set state='cancel',cancel_date=%s where id=%s"
#
# pd.read_sql(update_cancel_sql, con=db_handle, params=(temu_order_id, utc_time.strftime('%Y-%m-%d %H:%M:%S')), chunksize=100)
# else:
# return_res['msg'] = '系统未查询到订单'
#
# except Exception as err:
# return_res['code'] = 500
# return_res['msg'] = str(err)
# _logger.error('package_cancel error:%s' %
# str(err))
# val = {
# 'big_bag_no': 'Temu订单取消: %s' % order_no,
# 'error_msg': return_res['msg'],
# 'push_time': utc_time,
# 'data_text': json.dumps(kw),
# 'success_bl': False if return_res['msg'] else True,
# 'request_id': kws['request_id'],
# 'source': '推入',
# 'create_date': datetime.utcnow()
# }
# val_df = pd.DataFrame(val, index=[0])
# val_df.to_sql('ao_tt_api_log', con=db_handle, if_exists='append', index=False)
# return return_res
#
FROM coding-public-docker.pkg.coding.net/public/docker/python:3.8
RUN mkdir /etc/supervisor
ADD . /mnt/extra-addons
RUN pip3 install -r /mnt/extra-addons/requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
RUN mkdir -p /var/log/supervisor/
RUN pip install supervisor -i https://mirrors.aliyun.com/pypi/simple/
ADD ./supervisord_conf/supervisord.conf /etc/supervisor/
EXPOSE 9001
CMD ["supervisord","-n","-c","/etc/supervisor/supervisord.conf"]
[program:tiktok_queue_1]
process_name=%(program_name)s_%(process_num)02d ; 进程名称
directory = /mnt/extra-addons ; 程序的启动目录
command = /usr/local/bin/python3 /mnt/extra-addons/tiktok_queue.py ; 启动命令
autostart = true ; 在 supervisord 启动的时候也自动启动
startsecs = 5 ; 启动 5 秒后没有异常退出,就当作已经正常启动了
autorestart = true ; 程序异常退出后自动重启
startretries = 3 ; 启动失败自动重试次数,默认是 3
user = root ; 用哪个用户启动
numprocs=1 ; 进程数
redirect_stderr = true ; 把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes = 20MB ; stdout 日志文件大小,默认 50MB
stdout_logfile_backups = 20 ; stdout 日志文件备份数
; stdout 日志文件,需要注意当指定目录不存在时无法正常启动,所以需要手动创建目录(supervisord 会自动创建日志文件)
stdout_logfile = /var/log/supervisor/tiktok_queue.log
\ No newline at end of file
[program:tiktok_service_1]
process_name=%(program_name)s_%(process_num)02d ; 进程名称
directory = /mnt/extra-addons ; 程序的启动目录
command = nameko run tiktok_service --config foobar.yaml ; 启动命令
autostart = true ; 在 supervisord 启动的时候也自动启动
startsecs = 5 ; 启动 5 秒后没有异常退出,就当作已经正常启动了
autorestart = true ; 程序异常退出后自动重启
startretries = 3 ; 启动失败自动重试次数,默认是 3
user = root ; 用哪个用户启动
numprocs=2 ; 进程数
redirect_stderr = true ; 把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes = 20MB ; stdout 日志文件大小,默认 50MB
stdout_logfile_backups = 20 ; stdout 日志文件备份数
; stdout 日志文件,需要注意当指定目录不存在时无法正常启动,所以需要手动创建目录(supervisord 会自动创建日志文件)
stdout_logfile = /var/log/supervisor/tiktok_service.log
; supervisor config file
[unix_http_server]
file=/var/run/supervisor.sock ; (the path to the socket file)
chmod=0700 ; sockef file mode (default 0700)
[supervisord]
nodaemon=true
logfile=/var/log/supervisor/supervisord.log ; (main log file;default $CWD/supervisord.log)
pidfile=/var/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
childlogdir=/var/log/supervisor ; ('AUTO' child log dir, default $TEMP)
; the below section must remain in the config file for RPC
; (supervisorctl/web interface) to work, additional interfaces may be
; added by defining them in separate rpcinterface: sections
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL for a unix socket
; The [include] section can just contain the "files" setting. This
; setting can list multiple files (separated by whitespace or
; newlines). It can also contain wildcards. The filenames are
; interpreted as relative to this file. Included files *cannot*
; include files themselves.
[inet_http_server]
port=9001
username=admin
password=admin
[include]
files = /mnt/extra-addons/supervisord_conf/conf.d/*.conf
# !/usr/bin/python
# -*- coding:utf-8 -*-
# Author: Zhichang Fu
# Created Time: 2019-03-26 23:11:14
'''
BEGIN
function:
Hello Service
END
'''
import ast
import copy
import json
import sys
import time
import requests
import logging
import hashlib
import uuid
from random import Random
import pandas as pd
from nameko.rpc import rpc, RpcProxy
from datetime import datetime, timedelta, timezone
from dependence.services import config
from dependence.services import db_handle
from dependence.services.util import YhjCommon
from dependence.services.redis_service import RedisClient
from logging.handlers import RotatingFileHandler
# from line_profiler import LineProfiler
# # 创建一个 LineProfiler 实例
# profiler = LineProfiler()
sys.path.append("..")
logging.basicConfig(level=logging.INFO) # 调试debug级
# 创建日志记录器,指明日志保存的路径、每个日志文件的最大大小、保存的日志文件个数上限
file_log_handler = RotatingFileHandler("logs/temu_service.log", maxBytes=1024 * 1024 * 100, backupCount=15, encoding='utf-8')
# 创建日志记录的格式 日志等级 输入日志信息的文件名 行数 日志信息
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]')
# 为刚创建的日志记录器设置日志记录格式
file_log_handler.setFormatter(formatter)
# 为全局的日志工具对象(flask app使用的)添加日志记录器
logging.getLogger().addHandler(file_log_handler)
# logging.basicConfig(handlers=[logging.FileHandler('logs/tiktok_test.py.log', 'a', 'utf-8')],
# format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
_logger = logging.getLogger(__name__)
# 只会运行的时候连接一次
# db_handle = DbService().conn_engine
# odoo_conn = Order_dispose()
common = YhjCommon()
redis_client = RedisClient()
redis_obj = redis_client.get_redis(config.REDIS_NAME, config.REDIS_HOST, config.REDIS_PORT, config.REDIS_DB)
class TemuService(object):
name = "temu_service"
@rpc
def temu_create_order_service(self, **kw_data):
return_res = {"msg": '', "result": {}}
# 原始数据
kw = kw_data.get('data', {})
order_no = kw.get('orderNo')
try:
# 使用上下文管理器获取游标 (自动处理事务)
with db_handle.get_cursor() as cr:
# -------------------------------------------------------
# 1. 检查订单状态 & 锁行 (FOR UPDATE)
# -------------------------------------------------------
cr.execute("""
SELECT id, version_no, logistics_order_no, delivery_mode, deliver_method
FROM temu_order
WHERE temu_delivery_no = %s
FOR UPDATE
""", (order_no,))
existing = cr.fetchone()
order_id = None
logistics_order_no = None
incoming_seq = int(kw.get('sequence', 0))
if existing:
order_id, current_seq, logistics_order_no, d_mode, d_method = existing
# === 场景 A: 版本过低 ===
if incoming_seq < (current_seq or 0):
return_res['msg'] = '下单失败:版本号低于当前系统版本'
return_res['result'] = {'orderNo': order_no, 'logisticsOrderNo': logistics_order_no}
self._log_api(cr, kw_data, order_no, return_res['msg'])
return return_res
# === 场景 B: 版本一致 (直接返回) ===
elif incoming_seq == (current_seq or 0):
# 如果需要返回箱号,查一下
carton_res = []
if str(d_mode) != '1' or str(d_method) != '3':
cr.execute("SELECT carton_no, service_carton_no FROM temu_order_carton WHERE order_id=%s", (order_id,))
carton_res = [{'cartonNo': row[0], 'logisticsCartonNo': row[1]} for row in cr.fetchall()]
return_res['result'] = {
'orderNo': order_no,
'logisticsOrderNo': logistics_order_no,
'cartonInfo': carton_res
}
return return_res
# === 场景 C: 更新模式 ===
else:
# 原生 SQL 更新主表
self._update_order(cr, order_id, kw)
# 清理旧数据 (全删全插模式)
cr.execute("DELETE FROM temu_order_sku WHERE order_id=%s", (order_id,))
cr.execute("DELETE FROM temu_order_carton WHERE order_id=%s", (order_id,))
cr.execute("DELETE FROM temu_order_container WHERE order_id=%s", (order_id,))
else:
# === 场景 D: 创建模式 ===
order_id, logistics_order_no = self._insert_order(cr, kw)
# -------------------------------------------------------
# 2. 批量处理子表 (复用于 创建/更新)
# -------------------------------------------------------
# --- A. 处理柜信息 (Container) ---
containers = kw.get('containerInfo', [])
if containers:
c_data = [(
order_id, c.get('containerType'), c.get('containerWeight'), c.get('containerWeightUnit'),
c.get('containerVolume'), c.get('containerVolumeUnit')
) for c in containers]
cr.executemany("""
INSERT INTO temu_order_container
(order_id, container_type, container_weight, container_weight_unit, container_volume, container_volume_unit)
VALUES (%s, %s, %s, %s, %s, %s)
""", c_data)
# --- B. 处理大箱与SKU (Carton & SKU) ---
cartons = kw.get('cartonInfo', [])
carton_insert_values = []
# 用于暂存 SKU 数据,结构:{'cartonNo': [sku1, sku2...]}
sku_temp_holder = {}
for c in cartons:
# 收集大箱参数
carton_insert_values.append((
order_id, c['cartonNo'], c.get('weight'), c.get('weightUnit'),
c.get('length'), c.get('width'), c.get('height'),
c.get('lengthUnit'), c.get('electrified')
))
# 暂存 SKU 数据,后续拿到 ID 后再处理
if c.get('skuInfo'):
sku_temp_holder[c['cartonNo']] = c['skuInfo']
# ---------------------------------------------------------------
# 2. 批量插入大箱 (1次数据库交互)
# ---------------------------------------------------------------
if carton_insert_values:
# 注意:executemany 不支持 RETURNING 返回结果集(在某些驱动下),
# 所以我们只负责插入,不在这里获取 ID
carton_gen_sql = "('CTN' || to_char(NOW(), 'YYYYMMDD') || lpad(nextval('seq_temu_carton_service')::text, 7, '0'))"
carton_sql = f"""
INSERT INTO temu_order_carton (
order_id, carton_no, service_carton_no, weight, weight_unit, length, width, height,
length_unit, is_electrified, create_date
) VALUES (
%s, %s, {carton_gen_sql}, %s, %s, %s, %s, %s, %s, %s, NOW()
)
"""
cr.executemany(carton_sql, carton_insert_values)
# ---------------------------------------------------------------
# 3. 批量回查 ID 并建立映射 (1次数据库交互)
# ---------------------------------------------------------------
# 核心技巧:利用 order_id 把刚才插入的所有箱子一次性查出来
# 假设 carton_no 在一个订单内是唯一的,我们可以用它做 Key
select_sql = """
SELECT carton_no, id, service_carton_no
FROM temu_order_carton
WHERE order_id = %s
"""
cr.execute(select_sql, (order_id,))
# 建立映射字典: {'carton_no': db_id}
# 同时也收集需要返回给 API 的结果
carton_no_to_id_map = {}
response_cartons = []
for row in cr.fetchall():
c_no, db_id, svc_c_no = row
carton_no_to_id_map[c_no] = db_id
# 收集返回给 Temu 的数据结构
response_cartons.append({
'cartonNo': c_no,
'logisticsCartonNo': svc_c_no
})
# ---------------------------------------------------------------
# 4. 准备 SKU 的批量插入数据
# ---------------------------------------------------------------
sku_insert_values = []
for c_no, sku_list in sku_temp_holder.items():
# 通过映射找到数据库 ID
db_carton_id = carton_no_to_id_map.get(c_no)
if db_carton_id:
for sku in sku_list:
sku_insert_values.append((
order_id,
db_carton_id,
sku['id'],
sku['quantity']
))
# ---------------------------------------------------------------
# 5. 批量插入 SKU (1次数据库交互)
# ---------------------------------------------------------------
if sku_insert_values:
sku_sql = """
INSERT INTO temu_order_sku (order_id, carton_id, sku_id, quantity)
VALUES (%s, %s, %s, %s)
"""
cr.executemany(sku_sql, sku_insert_values)
# -------------------------------------------------------
# 3. 构造返回
# -------------------------------------------------------
final_cartons = []
# 逻辑判断:是否需要返回箱号
if str(kw.get('deliverMethod')) != '3' or str(kw.get('deliveryMode')) != '1':
final_cartons = response_cartons
return_res['result'] = {
'orderNo': order_no,
'logisticsOrderNo': logistics_order_no,
'cartonInfo': final_cartons
}
# 记录日志
self._log_api(cr, kw_data, order_no, "")
except Exception as e:
_logger.exception(f"temu_create_order_service Error: {order_no}")
return_res['msg'] = f"系统内部错误: {str(e)}"
return_res['result'] = {'orderNo': order_no}
# 这里的 rollback 由 DbService 的 contextmanager 自动处理
return return_res
# --- 辅助 SQL 方法 ---
def _insert_order(self, cr, kw):
""" 插入主表并返回 ID 和 生成的单号 """
logistics_no_sql = "('LGS' || to_char(NOW(), 'YYYYMMDD') || lpad(nextval('seq_temu_logistics_order')::text, 4, '0'))"
sql = f"""
INSERT INTO temu_order (
temu_delivery_no, logistics_order_no, channel_code, warehouse_name, version_no, deliver_method,
delivery_mode, is_electrified, predict_charge, predict_charge_currency, export_license_source,
company_name, company_contact_name, company_phone, company_email,
ship_contact_name, ship_region1, ship_region2, ship_region3, ship_region4,
ship_detail_address, ship_email, ship_phone, ship_postcode,
dest_contact_name, dest_region1, dest_region2, dest_region3, dest_region4,
dest_detail_address, dest_email, dest_phone, dest_postcode, dest_warehouse_no,
create_date, write_date, state, carton_count
) VALUES (
%s, {logistics_no_sql}, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
NOW(), NOW(), 'progress', %s
) RETURNING id, logistics_order_no
"""
params = (
kw.get('orderNo'), kw.get('channelCode'), kw.get('warehouse'), kw.get('sequence'), kw.get('deliverMethod'),
kw.get('deliveryMode'), kw.get('electrified'), kw.get('predictCharge'), kw.get('predictChargeCurrency'), kw.get('exportLicenseSource'),
kw.get('companyInfo', {}).get('companyName'), kw.get('companyInfo', {}).get('fullName'), kw.get('companyInfo', {}).get('phone'), kw.get('companyInfo', {}).get('email'),
kw.get('shippingInfo', {}).get('fullName'), kw.get('shippingInfo', {}).get('regionName1'), kw.get('shippingInfo', {}).get('regionName2'), kw.get('shippingInfo', {}).get('regionName3'), kw.get('shippingInfo', {}).get('regionName4'),
kw.get('shippingInfo', {}).get('detailedAddress'), kw.get('shippingInfo', {}).get('email'), kw.get('shippingInfo', {}).get('phone'), kw.get('shippingInfo', {}).get('postcode'),
kw.get('destinationInfo', {}).get('fullName'), kw.get('destinationInfo', {}).get('regionName1'), kw.get('destinationInfo', {}).get('regionName2'), kw.get('destinationInfo', {}).get('regionName3'), kw.get('destinationInfo', {}).get('regionName4'),
kw.get('destinationInfo', {}).get('detailedAddress'), kw.get('destinationInfo', {}).get('email'), kw.get('destinationInfo', {}).get('phone'), kw.get('destinationInfo', {}).get('postcode'), kw.get('destinationInfo', {}).get('warehouseNo'),
len(kw.get('cartonInfo', []))
)
cr.execute(sql, params)
return cr.fetchone()
def _update_order(self, cr, order_id, kw):
sql = """
UPDATE temu_order SET
channel_code=%s, warehouse_name=%s, version_no=%s, deliver_method=%s,
delivery_mode=%s, is_electrified=%s, predict_charge=%s, predict_charge_currency=%s,
export_license_source=%s,
company_name=%s, company_contact_name=%s, company_phone=%s, company_email=%s,
ship_contact_name=%s, ship_region1=%s, ship_region2=%s, ship_region3=%s, ship_region4=%s,
ship_detail_address=%s, ship_email=%s, ship_phone=%s, ship_postcode=%s,
dest_contact_name=%s, dest_region1=%s, dest_region2=%s, dest_region3=%s, dest_region4=%s,
dest_detail_address=%s, dest_email=%s, dest_phone=%s, dest_postcode=%s, dest_warehouse_no=%s,
write_date=NOW()
WHERE id=%s
"""
params = (
kw.get('orderNo'), kw.get('channelCode'), kw.get('warehouse'), kw.get('sequence'), kw.get('deliverMethod'),
kw.get('deliveryMode'), kw.get('electrified'), kw.get('predictCharge'), kw.get('predictChargeCurrency'), kw.get('exportLicenseSource'),
kw.get('companyInfo', {}).get('companyName'), kw.get('companyInfo', {}).get('fullName'), kw.get('companyInfo', {}).get('phone'), kw.get('companyInfo', {}).get('email'),
kw.get('shippingInfo', {}).get('fullName'), kw.get('shippingInfo', {}).get('regionName1'), kw.get('shippingInfo', {}).get('regionName2'), kw.get('shippingInfo', {}).get('regionName3'), kw.get('shippingInfo', {}).get('regionName4'),
kw.get('shippingInfo', {}).get('detailedAddress'), kw.get('shippingInfo', {}).get('email'), kw.get('shippingInfo', {}).get('phone'), kw.get('shippingInfo', {}).get('postcode'),
kw.get('destinationInfo', {}).get('fullName'), kw.get('destinationInfo', {}).get('regionName1'), kw.get('destinationInfo', {}).get('regionName2'), kw.get('destinationInfo', {}).get('regionName3'), kw.get('destinationInfo', {}).get('regionName4'),
kw.get('destinationInfo', {}).get('detailedAddress'), kw.get('destinationInfo', {}).get('email'), kw.get('destinationInfo', {}).get('phone'), kw.get('destinationInfo', {}).get('postcode'), kw.get('destinationInfo', {}).get('warehouseNo'),
order_id
)
cr.execute(sql, params)
def _log_api(self, cr, kw_data, order_no, error_msg):
try:
cr.execute("""
INSERT INTO temu_api_log (big_bag_no, error_msg, push_time, data_text, request_id, source)
VALUES (%s, %s, NOW(), %s, %s, '推入')
""", (
order_no, error_msg,
json.dumps(kw_data.get('data', {})),
kw_data.get('request_id')
))
except Exception:
pass # 日志错误不影响主流程
def get_rf_to_time(self, rfc3339_string):
# 将RFC 3339时间字符串解析为datetime对象
dt = datetime.fromisoformat(rfc3339_string.replace('Z', '+00:00'))
# 如果需要将其转换为UTC时区,可以显式设置时区
dt_utc = dt.replace(tzinfo=timezone.utc)
# 输出结果
dt_utc = dt_utc.strftime("%Y-%m-%d %H:%M:%S")
print("Parsed datetime (UTC):", dt_utc)
return dt_utc
# @rpc
# def temu_create_order_service(self, **kw_data):
# # 接收temu订单信息
# return_res = {
# "msg": '',
# "result": {}
# }
# order_no = ''
# kw = {}
# try:
# # 获取订单号
# # 查询订单是否存在系统 不存在则创建
# # 存在 则判断版本号 版本号低于订单的版本号返回失败 高于则更新订单
# kw = kw_data['data']
# order_no = kw['orderNo']
# # 传入的数据
# vals = {
# "temu_delivery_no": kw['orderNo'],
# "channel_code": kw['channelCode'],
# "warehouse_name": kw.get('warehouse'),
# "version_no": kw['sequence'],
# "deliver_method": kw['deliverMethod'],
# "delivery_mode": kw['deliveryMode'],
# "is_electrified": kw['electrified'],
# "predict_charge": kw.get('predictCharge'),
# "predict_charge_currency": kw.get('predictChargeCurrency'),
#
# "export_license_source": kw['exportLicenseSource'],
# # 公司信息
# "company_name": kw.get('companyInfo', {}).get('companyName', ''),
# "company_contact_name": kw.get('companyInfo', {}).get('fullName', ''),
# "company_phone": kw.get('companyInfo', {}).get('phone', ''),
# "company_email": kw.get('companyInfo', {}).get('email', ''),
# # 发货仓
# "ship_contact_name": kw.get('shippingInfo', {}).get('fullName', ''),
# "ship_region1": kw.get('shippingInfo', {}).get('regionName1', ''),
# "ship_region2": kw.get('shippingInfo', {}).get('regionName2', ''),
# "ship_region3": kw.get('shippingInfo', {}).get('regionName3', ''),
# "ship_region4": kw.get('shippingInfo', {}).get('regionName4', ''),
# "ship_detail_address": kw.get('shippingInfo', {}).get('detailedAddress', ''),
# "ship_email": kw.get('shippingInfo', {}).get('email', ''),
# "ship_phone": kw.get('shippingInfo', {}).get('phone', ''),
# "ship_postcode": kw.get('shippingInfo', {}).get('postcode', ''),
# # 目的仓
# "dest_contact_name": kw.get('destinationInfo', {}).get('fullName', ''),
# "dest_region1": kw.get('destinationInfo', {}).get('regionName1', ''),
# "dest_region2": kw.get('destinationInfo', {}).get('regionName2', ''),
# "dest_region3": kw.get('destinationInfo', {}).get('regionName3', ''),
# "dest_region4": kw.get('destinationInfo', {}).get('regionName4', ''),
# "dest_detail_address": kw.get('destinationInfo', {}).get('detailedAddress', ''),
# "dest_email": kw.get('destinationInfo', {}).get('email', ''),
# "dest_phone": kw.get('destinationInfo', {}).get('phone', ''),
# "dest_postcode": kw.get('destinationInfo', {}).get('postcode', ''),
# "dest_warehouse_no": kw.get('destinationInfo', {}).get('warehouseNo', ''),
# "carton_count": len(kw.get('containerInfo', []))
# }
# sql = "select id,logistics_order_no,version_no,delivery_mode,deliver_method from temu_order where temu_delivery_no=%s"
# temu_order_sql_result = pd.read_sql(sql, con=db_handle, params=(order_no, ))
# temu_order_id = ''
# temu_order_version_no = 0
# temu_logistics_order_no = ''
# temu_delivery_mode = ''
# temu_deliver_method = ''
# for temu_res in temu_order_sql_result.itertuples():
# temu_order_id = temu_res.id
# temu_order_version_no = temu_res.version_no
# temu_logistics_order_no = temu_res.logistics_order_no
# temu_delivery_mode = temu_res.delivery_mode
# temu_deliver_method = temu_res.deliver_method
# if temu_order_id: # 存在
# # 判断版本号
# result = {
# 'orderNo': order_no,
# 'logisticsOrderNo': temu_logistics_order_no,
# # 'cartonInfo': [{
# # "cartonNo": "",
# # "logisticsCartonNo": "",
# # }]
# }
# if vals['version_no'] < temu_order_version_no:
# return_res['msg'] = '下单失败:版本号低于当前系统版本'
# return_res['result'] = result
# elif vals['version_no'] == temu_order_version_no:
# carton_info_arr = []
# if temu_delivery_mode != '1' or temu_deliver_method != '3':
# sql = "select carton_no,service_carton_no from temu_order_carton where order_id=%s"
# order_carton_result = pd.read_sql(sql, con=db_handle, params=(temu_order_id, ))
# for carton_res in order_carton_result.itertuples():
# carton_info_arr.append({
# 'cartonNo': carton_res.carton_no,
# 'logisticsCartonNo': carton_res.service_carton_no,
# })
# result['cartonInfo'] = carton_info_arr
# return_res['result'] = result
# else:
# # 更新订单数据
# update_query = """
# UPDATE temu_order
# SET
# channel_code = %s,
# warehouse_name = %s,
# version_no = %s,
# deliver_method = %s,
# delivery_mode = %s,
# is_electrified = %s,
# predict_charge = %s,
# predict_charge_currency = %s,
# export_license_source = %s,
#
# -- 公司信息
# company_name = %s,
# company_contact_name = %s,
# company_phone = %s,
# company_email = %s,
#
# -- 发货仓信息
# ship_contact_name = %s,
# ship_region1 = %s,
# ship_region2 = %s,
# ship_region3 = %s,
# ship_region4 = %s,
# ship_detail_address = %s,
# ship_email = %s,
# ship_phone = %s,
# ship_postcode = %s,
#
# -- 目的仓信息
# dest_contact_name = %s,
# dest_region1 = %s,
# dest_region2 = %s,
# dest_region3 = %s,
# dest_region4 = %s,
# dest_detail_address = %s,
# dest_email = %s,
# dest_phone = %s,
# dest_postcode = %s,
# dest_warehouse_no = %s,
#
# WHERE id = %s
# """
# params = (
# vals.get('channel_code'),
# vals.get('warehouse_name'),
# vals.get('version_no'),
# vals.get('deliver_method'),
# vals.get('delivery_mode'),
# vals.get('is_electrified'),
# vals.get('predict_charge'),
# vals.get('predict_charge_currency'),
# vals.get('export_license_source'),
#
# # 公司信息
# vals.get('company_name'),
# vals.get('company_contact_name'),
# vals.get('company_phone'),
# vals.get('company_email'),
#
# # 发货仓
# vals.get('ship_contact_name'),
# vals.get('ship_region1'),
# vals.get('ship_region2'),
# vals.get('ship_region3'),
# vals.get('ship_region4'),
# vals.get('ship_detail_address'),
# vals.get('ship_email'),
# vals.get('ship_phone'),
# vals.get('ship_postcode'),
#
# # 目的仓
# vals.get('dest_contact_name'),
# vals.get('dest_region1'),
# vals.get('dest_region2'),
# vals.get('dest_region3'),
# vals.get('dest_region4'),
# vals.get('dest_detail_address'),
# vals.get('dest_email'),
# vals.get('dest_phone'),
# vals.get('dest_postcode'),
# vals.get('dest_warehouse_no'),
#
# temu_order_id
# )
# pd.read_sql(update_query, con=db_handle, params=params, chunksize=100) # 更新temu_order的sql语句
# delete_sku_sql = 'delete from temu_order_sku where order_id=%s' % temu_order_id
# pd.read_sql(delete_sku_sql, con=db_handle, chunksize=100)
# delete_container_sql = 'delete from temu_order_container where order_id=%s' % temu_order_id
# pd.read_sql(delete_container_sql, con=db_handle, chunksize=100)
# delete_carton_sql = 'delete from temu_order_carton where order_id=%s' % temu_order_id
# pd.read_sql(delete_carton_sql, con=db_handle, chunksize=100)
# # 创建柜
# container_arr = kw.get('containerInfo', [])
# create_container_arr = []
# for container_data in container_arr:
# create_container_arr.append({
# "order_id": temu_order_id,
# "container_type": container_data['containerType'],
# "container_weight": container_data['containerWeight'],
# "container_weight_unit": container_data['containerWeightUnit'],
# "container_volume": container_data['containerVolume'],
# "container_volume_unit": container_data['containerVolumeUnit'],
# })
# if create_container_arr:
# # 创建柜信息
# val_df = pd.DataFrame(create_container_arr)
# val_df.to_sql('temu_order_container', con=db_handle, if_exists='append', index=False)
# # 创建大箱
# create_carton_no_arr = []
# # values_str = ''
# carton_arr = kw.get('cartonInfo', [])
# create_carton_arr = []
# create_sku_arr = []
# for carton_data in carton_arr:
# carton_no = carton_data['cartonNo']
# create_carton_no_arr.append(carton_no)
# create_carton_arr.append({
# 'order_id': temu_order_id,
# 'carton_no': carton_no,
# 'weight': carton_data['weight'],
# 'weight_unit': carton_data['weightUnit'],
# 'length': carton_data['length'],
# 'width': carton_data['width'],
# 'height': carton_data['height'],
# 'length_unit': carton_data['lengthUnit'],
# 'is_electrified': carton_data['electrified'],
# })
# sku_arr = carton_data.get('skuInfo', [])
# for sku_data in sku_arr:
# create_sku_arr.append({
# 'carton_no': carton_no,
# 'sku_id': sku_data['id'],
# 'quantity': sku_data['quantity'],
# })
# return_carton_arr = []
# if create_carton_arr:
# carton_no_id_res = {}
# val_df = pd.DataFrame(create_carton_arr)
# val_df.to_sql('temu_order_carton', con=db_handle, if_exists='append', index=False)
# # 获取创建后 大箱单号对应的id,bl_id
# carton_no_str = '(%s)' % str(create_carton_no_arr)[1:-1]
# sql = 'select id,carton_no,service_carton_no,order_id from temu_order_carton where carton_no in %s' % carton_no_str
# new_order_carton_arr = pd.read_sql(sql, con=db_handle)
# for new_carton_data in new_order_carton_arr.itertuples():
# carton_no_id_res[new_carton_data.carton_no] = (new_carton_data.id, new_carton_data.order_id)
# return_carton_arr.append({
# 'cartonNo': new_carton_data.carton_no,
# 'logisticsCartonNo': new_carton_data.service_carton_no,
# })
# if create_sku_arr:
# for create_sku_vals in create_sku_arr:
# carton_no = create_sku_vals['carton_no']
# data_list = carton_no_id_res.get(carton_no, [])
# create_sku_vals['carton_id'] = data_list[0] if data_list else None
# create_sku_vals['order_id'] = data_list[1] if data_list else None
# val_df = pd.DataFrame(create_sku_arr)
# val_df.to_sql('temu_order_sku', con=db_handle, if_exists='append', index=False)
# if vals['deliver_method'] != '3' or vals['delivery_mode'] != '1':
# result['cartonInfo'] = return_carton_arr
# return_res['result'] = result
# else:
# # 创建
# val_df = pd.DataFrame(vals, index=[0])
# val_df.to_sql('temu_order', con=db_handle, if_exists='append', index=False)
# sql = "select id,logistics_order_no from temu_order where temu_delivery_no='%s';" % vals['temu_delivery_no']
# new_order = pd.read_sql(sql, con=db_handle)
# logging.info("new_order:%s" % len(new_order))
# result_id = new_order.to_dict()['id'][0]
# logistics_order_no = new_order.to_dict()['logistics_order_no'][0]
# result = {
# 'orderNo': order_no,
# 'logisticsOrderNo': logistics_order_no,
# # 'cartonInfo': [{
# # "cartonNo": "",
# # "logisticsCartonNo": "",
# # }]
# }
# # 创建柜
# container_arr = kw.get('containerInfo', [])
# create_container_arr = []
# for container_data in container_arr:
# create_container_arr.append({
# "order_id": result_id,
# "container_type": container_data['containerType'],
# "container_weight": container_data['containerWeight'],
# "container_weight_unit": container_data['containerWeightUnit'],
# "container_volume": container_data['containerVolume'],
# "container_volume_unit": container_data['containerVolumeUnit'],
#
# })
# if create_container_arr:
# # 创建柜信息
# val_df = pd.DataFrame(create_container_arr)
# val_df.to_sql('temu_order_container', con=db_handle, if_exists='append', index=False)
# # 创建大箱
# create_carton_no_arr = []
# # values_str = ''
# carton_arr = kw.get('cartonInfo', [])
# create_carton_arr = []
# create_sku_arr = []
# for carton_data in carton_arr:
# carton_no = carton_data['cartonNo']
# create_carton_no_arr.append(carton_no)
# create_carton_arr.append({
# 'order_id': result_id,
# 'carton_no': carton_no,
# 'weight': carton_data['weight'],
# 'weight_unit': carton_data['weightUnit'],
# 'length': carton_data['length'],
# 'width': carton_data['width'],
# 'height': carton_data['height'],
# 'length_unit': carton_data['lengthUnit'],
# 'is_electrified': carton_data['electrified'],
# })
# sku_arr = carton_data.get('skuInfo', [])
# for sku_data in sku_arr:
# create_sku_arr.append({
# 'carton_no': carton_no,
# 'sku_id': sku_data['id'],
# 'quantity': sku_data['quantity'],
# })
# return_carton_arr = []
# if create_carton_arr:
# carton_no_id_res = {}
# val_df = pd.DataFrame(create_carton_arr)
# val_df.to_sql('temu_order_carton', con=db_handle, if_exists='append', index=False)
# # 获取创建后 大箱单号对应的id,bl_id
# carton_no_str = '(%s)' % str(create_carton_no_arr)[1:-1]
# sql = 'select id,carton_no,service_carton_no,order_id from temu_order_carton where carton_no in %s' % carton_no_str
# new_order_carton_arr = pd.read_sql(sql, con=db_handle)
# for new_carton_data in new_order_carton_arr.itertuples():
# carton_no_id_res[new_carton_data.carton_no] = (new_carton_data.id, new_carton_data.order_id)
# return_carton_arr.append({
# 'cartonNo': new_carton_data.carton_no,
# 'logisticsCartonNo': new_carton_data.service_carton_no,
# })
# if create_sku_arr:
# for create_sku_vals in create_sku_arr:
# carton_no = create_sku_vals['carton_no']
# data_list = carton_no_id_res.get(carton_no, [])
# create_sku_vals['carton_id'] = data_list[0] if data_list else None
# create_sku_vals['order_id'] = data_list[1] if data_list else None
# val_df = pd.DataFrame(create_sku_arr)
# val_df.to_sql('temu_order_sku', con=db_handle, if_exists='append', index=False)
# if vals['deliver_method'] != '3' or vals['delivery_mode'] != '1':
# result['cartonInfo'] = return_carton_arr
# return_res['result'] = result
# except Exception as err:
# return_res['msg'] = str(err)
# result = {'orderNo': order_no}
# return_res['result'] = result
# _logger.error('temu_create_order_service error:%s' %
# str(err))
# log_val = {
# 'big_bag_no': order_no,
# 'error_msg': return_res['msg'],
# 'push_time': datetime.utcnow(),
# 'data_text': json.dumps(kw),
# 'success_bl': return_res['msg'],
# 'request_id': kw_data['request_id'],
# 'source': '推入',
# }
# val_df = pd.DataFrame(log_val, index=[0])
# val_df.to_sql('temu_api_log', con=db_handle, if_exists='append', index=False)
# return return_res
# @rpc
# def temu_create_order_service(self, **kw):
# # 接收清关包裹信息
# msg = ''
# return_res = {
# "all_result": True,
# "failed_provider_order_ids": [],
# "err_msg": {},
# }
# logistic_order_no = ''
# data_text = ''
# utc_time = ''
# try:
# push_interval_15_days = 15
# tt_customer_id = None
# # 查询过滤最近天数 查询默认客户
# sql = "select key,value from ir_config_parameter where key='tk_push_interval_15_days' or key='tt_customer_id'"
# result_arr = pd.read_sql(sql, con=db_handle)
# for res in result_arr.itertuples():
# if res.key == 'tk_push_interval_15_days':
# push_interval_15_days = int(res.value)
# elif res.key == 'tt_customer_id':
# tt_customer_id = int(res.value)
# # 查询默认小包状态
# node_id = None
# sql = "select id from cc_node where node_type='package' and is_default=True limit 1"
# node_arr = pd.read_sql(sql, con=db_handle)
# for node_res in node_arr.itertuples():
# node_id = int(node_res.id)
# parcel_arr = kw['packages']
# date = datetime.now() - timedelta(days=int(push_interval_15_days))
# create_package_arr = [] # 创建小包vals数据
# delete_goods_ids = [] # 删除的小包商品
# package_no_id_res = {} # 维护小包号和id的对应
# create_package_no_arr = [] # 创建的小包号列表
# package_goods_vals_arr = [] # 创建的小包商品vals数据
# data_text = json.dumps(parcel_arr)
# utc_time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
# for package in parcel_arr:
# try:
# logistic_order_no = package.get('provider_order_id')
# sql = "select id,bl_id,is_cancel from cc_ship_package where logistic_order_no=%s and create_date >= %s"
# parcel_sql_result = pd.read_sql(sql, con=db_handle, params=(logistic_order_no, date))
# parcel_id = ''
# parcel_bl_id = ''
# for parcel_res in parcel_sql_result.itertuples():
# parcel_id = parcel_res.id
# parcel_bl_id = parcel_res.bl_id
# parcel_is_cancel = parcel_res.is_cancel
# if (parcel_id and (not parcel_bl_id or
# parcel_is_cancel)) or not parcel_id:
# ship_package_vals = dict(create_date=utc_time,state=node_id,
# is_cancel=False, cancel_reason='',
# logistic_order_no=package.get(
# 'provider_order_id'),
# tracking_no=package.get(
# 'tracking_no'),
# customer_ref=package.get(
# 'declaretion_bill_id'),
# internal_account_number="",
# user_track_note=package.get(
# 'remark'),
# company_code=package.get(
# 'entity_code'),
# trade_no=package.get(
# 'order_no'),
# # 需要将时间搓转换为时间
# operation_time=datetime.fromtimestamp(
# int(package.get('operate_time')) / 1000).strftime(
# '%Y-%m-%d %H:%M:%S'),
# big_package_no=package.get(
# 'big_bag_no'),
# container_no=package.get(
# 'container_no'),
# buyer_region=package.get(
# 'buyer_region'),
# next_provider_name=package.get(
# 'next_provider_name'),
# sender_name=package.get(
# 'sender_info').get('name'),
# sender_vat_no=package.get(
# 'sender_info').get('shipping_tax_id'),
# sender_phone=package.get(
# 'sender_info').get('phone'),
# sender_country=package.get('sender_info').get('address').get(
# 'address_l0'),
# sender_state=package.get('sender_info').get('address').get(
# 'address_l1'),
# sender_city=package.get('sender_info').get(
# 'address').get('address_l2'),
# sender_add_1=package.get('sender_info').get('address').get(
# 'address_l3'),
# sender_add_2=package.get('sender_info').get('address').get(
# 'address_l4'),
# sender_add_3=package.get('sender_info').get(
# 'address').get('details'),
# sender_postcode=package.get(
# 'sender_info').get('postcode'),
# receiver_name=package.get(
# 'receiver_info').get('name'),
# receiver_phone=package.get(
# 'receiver_info').get('phone'),
# receiver_postcode=package.get(
# 'receiver_info').get('postcode'),
# receiver_add_1=package.get('receiver_info').get('address').get(
# 'address_l0'),
# receiver_add_2=package.get('receiver_info').get('address').get(
# 'address_l1'),
# receiver_add_3=package.get('receiver_info').get('address').get(
# 'address_l2'),
# receiver_city=package.get('receiver_info').get('address').get(
# 'address_l3'),
# receiver_county=package.get('receiver_info').get('address').get(
# 'address_l4'),
# receiver_detailed_address=package.get(
# 'receiver_info').get('address').get('details'),
# receiver_vat_no=package.get(
# 'receiver_info').get('tax_id'),
# currency=package.get(
# 'currency'),
# gross_weight=package.get(
# 'package').get('real_weight'),
# weight_unit=package.get(
# 'package').get('weight_unit'),
# total_value=package.get(
# 'value').get('goods_value'),
# customer_id=tt_customer_id, is_sync=True) # 增加客户信息
# if package.get('items') and len(package.get('items')) > 0:
#
# good_id_arr = []
# for item in package.get('items'):
# item_id = item.get('item_id')
# if item_id not in good_id_arr:
# good_id_arr.append(item_id)
# package_good = dict(create_date=utc_time,bl_line_id=logistic_order_no,
# bl_id=False,
# item_id=item_id,
# sku_id=item.get(
# 'sku_id'),
# item_name_cn=item.get(
# 'product_name_cn'),
# item_name_en=item.get(
# 'product_name'),
# export_hs_code=item.get(
# 'export_hscode'),
# import_hs_code=item.get(
# 'import_hscode'),
# weight=item.get(
# 'weight') or None,
# quantity=item.get(
# 'qty'),
# quantity_unit=item.get(
# 'unit'),
# declare_price=item.get(
# 'unit_price') or None,
# freight=item.get(
# 'shipping_fee') or None,
# cod_amount=item.get(
# 'cod_fee') or None,
# vat_rate=item.get(
# 'vat_rate') or None,
# item_vat=item.get(
# 'vat_fee') or None,
# origin_country=item.get(
# 'origin_country'),
# item_type=item.get(
# 'item_type') or None,
# item_total_price=item.get(
# 'unit_price') or None,
# item_link=item.get(
# 'item_url'),
# item_tax_status=item.get('tax_mark') or None,)
# # _logger.info('package_good:%s' % package_good)
# if package_good:
# package_goods_vals_arr.append(package_good)
# if not parcel_id:
# # 创建小包
# create_package_no_arr.append(logistic_order_no)
# create_package_arr.append(ship_package_vals)
# else:
# # 删除小包商品
# package_no_id_res[logistic_order_no] = (parcel_id, parcel_bl_id) # 小包号对应id
# sql = 'select id from cc_package_good where bl_line_id = %s'
# delete_good_result = pd.read_sql(sql, con=db_handle, params=(parcel_id,))
# origin_goods_ids = delete_good_result['id'].tolist()
# delete_goods_ids += origin_goods_ids
# # 更新小包
# update_sql = """
# UPDATE cc_ship_package
# SET
# is_cancel = %s,
# cancel_reason = %s,
# logistic_order_no = %s,
# tracking_no = %s,
# customer_ref = %s,
# internal_account_number = %s,
# user_track_note = %s,
# company_code = %s,
# trade_no = %s,
# operation_time = %s,
# big_package_no = %s,
# container_no = %s,
# buyer_region = %s,
# next_provider_name = %s,
# sender_name = %s,
# sender_vat_no = %s,
# sender_phone = %s,
# sender_country = %s,
# sender_state = %s,
# sender_city = %s,
# sender_add_1 = %s,
# sender_add_2 = %s,
# sender_add_3 = %s,
# sender_postcode = %s,
# receiver_name = %s,
# receiver_phone = %s,
# receiver_postcode = %s,
# receiver_add_1 = %s,
# receiver_add_2 = %s,
# receiver_add_3 = %s,
# receiver_city = %s,
# receiver_county = %s,
# receiver_detailed_address = %s,
# receiver_vat_no = %s,
# currency = %s,
# gross_weight = %s,
# weight_unit = %s,
# total_value = %s,
# customer_id = %s
# WHERE id = %s;
#
# """
# # 构建参数列表
# params = (
# False, # is_cancel
# '', # cancel_reason
# ship_package_vals['logistic_order_no'],
# ship_package_vals['tracking_no'],
# ship_package_vals['customer_ref'],
# '', # internal_account_number
# ship_package_vals['user_track_note'],
# ship_package_vals['company_code'],
# ship_package_vals['trade_no'],
# ship_package_vals['operation_time'],
# ship_package_vals['big_package_no'],
# ship_package_vals['container_no'],
# ship_package_vals['buyer_region'],
# ship_package_vals['next_provider_name'],
# ship_package_vals['sender_name'],
# ship_package_vals['sender_vat_no'],
# ship_package_vals['sender_phone'],
# ship_package_vals['sender_country'],
# ship_package_vals['sender_state'],
# ship_package_vals['sender_city'],
# ship_package_vals['sender_add_1'],
# ship_package_vals['sender_add_2'],
# ship_package_vals['sender_add_3'],
# ship_package_vals['sender_postcode'],
# ship_package_vals['receiver_name'],
# ship_package_vals['receiver_phone'],
# ship_package_vals['receiver_postcode'],
# ship_package_vals['receiver_add_1'],
# ship_package_vals['receiver_add_2'],
# ship_package_vals['receiver_add_3'],
# ship_package_vals['receiver_city'],
# ship_package_vals['receiver_county'],
# ship_package_vals['receiver_detailed_address'],
# ship_package_vals['receiver_vat_no'],
# ship_package_vals['currency'],
# ship_package_vals['gross_weight'],
# ship_package_vals['weight_unit'],
# ship_package_vals['total_value'],
# tt_customer_id,
# parcel_id # id
# )
# pd.read_sql(update_sql, con=db_handle, params=params, chunksize=100)
# except Exception as err:
# return_res['all_result'] = False
# return_res['failed_provider_order_ids'].append(
# package.get('provider_order_id'))
# return_res['err_msg'].update(
# {package.get('provider_order_id'): str(err)})
# msg = str(err)
# _logger.error('package_declare error:%s' %
# str(err))
# # 创建小包
# # print('create_package_arr', create_package_arr)
# if create_package_arr:
# val_df = pd.DataFrame(create_package_arr)
# val_df.to_sql('cc_ship_package', con=db_handle, if_exists='append', index=False)
# # 获取创建后 小包单号对应的id,bl_id
# package_no_str = '(%s)' % str(create_package_no_arr)[1:-1]
# sql = 'select id,logistic_order_no,bl_id from cc_ship_package where logistic_order_no in %s' % package_no_str
# new_order_arr = pd.read_sql(sql, con=db_handle)
# for new_order_data in new_order_arr.itertuples():
# package_no_id_res[new_order_data.logistic_order_no] = (new_order_data.id, new_order_data.bl_id)
# if delete_goods_ids: # 删除小包商品
# ids = '(%s)' % str(delete_goods_ids)[1:-1]
# delete_goods_sql = 'delete from cc_package_good where id in %s' % ids
# pd.read_sql(delete_goods_sql, con=db_handle, chunksize=100)
# if package_goods_vals_arr:
# for package_goods_vals in package_goods_vals_arr:
# logistic_order_no = package_goods_vals['bl_line_id']
# data_list = package_no_id_res.get(logistic_order_no, [])
# package_goods_vals['bl_line_id'] = data_list[0] if data_list else None
# package_goods_vals['bl_id'] = data_list[1] if data_list else None
# val_df = pd.DataFrame(package_goods_vals_arr)
# val_df.to_sql('cc_package_good', con=db_handle, if_exists='append', index=False)
# except Exception as err:
# return_res['all_result'] = False
# return_res['failed_provider_order_ids'].append(
# logistic_order_no)
# return_res['err_msg'].update(
# {logistic_order_no: str(err)})
# _logger.error('package_declare error:%s' %
# str(err))
# msg = str(err)
# val = {
# 'big_bag_no': logistic_order_no,
# 'error_msg': msg,
# 'push_time': utc_time,
# 'data_text': data_text,
# 'success_bl': return_res['all_result'],
# 'request_id': kw['request_id'],
# 'source': '推入',
# 'create_date': datetime.utcnow()
# }
# val_df = pd.DataFrame(val, index=[0])
# val_df.to_sql('ao_tt_api_log', con=db_handle, if_exists='append', index=False)
# return return_res
@rpc
def temu_cancel_order_service(self, **kws):
# 接收取消订单
return_res = {
"msg": ''
}
order_no = ''
kw = {}
utc_time = ''
try:
kw = kws['data']
order_no = kw['orderNo']
logistics_order_no = kw['logisticsOrderNo']
sql = "select id from temu_order where temu_delivery_no=%s and logistics_order_no=%s"
temu_order_id = ''
temu_order_sql_result = pd.read_sql(sql, con=db_handle, params=(order_no, logistics_order_no))
utc_time = datetime.utcnow()
for temu_res in temu_order_sql_result.itertuples():
temu_order_id = temu_res.id
if temu_order_id:
update_cancel_sql = "update temu_order set state='cancel',cancel_date=%s where id=%s"
pd.read_sql(update_cancel_sql, con=db_handle, params=(temu_order_id, utc_time.strftime('%Y-%m-%d %H:%M:%S')), chunksize=100)
else:
return_res['msg'] = '系统未查询到订单'
except Exception as err:
return_res['code'] = 500
return_res['msg'] = str(err)
_logger.error('package_cancel error:%s' %
str(err))
val = {
'big_bag_no': 'Temu订单取消: %s' % order_no,
'error_msg': return_res['msg'],
'push_time': utc_time,
'data_text': json.dumps(kw),
'success_bl': False if return_res['msg'] else True,
'request_id': kws['request_id'],
'source': '推入',
'create_date': datetime.utcnow()
}
val_df = pd.DataFrame(val, index=[0])
val_df.to_sql('ao_tt_api_log', con=db_handle, if_exists='append', index=False)
return return_res
# !/usr/bin/python
# -*- coding:utf-8 -*-
# Author: Zhichang Fu
# Created Time: 2019-03-26 23:11:14
'''
BEGIN
function:
Hello Service
END
'''
import ast
import copy
import json
import sys
import time
import requests
import logging
import hashlib
import uuid
import redis
from random import Random
import pandas as pd
from nameko.rpc import rpc, RpcProxy
from datetime import datetime, timedelta, timezone
from dependence.services import config
from dependence.services import DbService
from dependence.services.util import YhjCommon, Order_dispose
from dependence.services.redis_service import RedisClient
from logging.handlers import RotatingFileHandler
# from line_profiler import LineProfiler
# # 创建一个 LineProfiler 实例
# profiler = LineProfiler()
# sys.path.append("..")
logging.basicConfig(level=logging.INFO) # 调试debug级
# 创建日志记录器,指明日志保存的路径、每个日志文件的最大大小、保存的日志文件个数上限
file_log_handler = RotatingFileHandler("logs/tiktok_queue.log", maxBytes=1024 * 1024 * 100, backupCount=15, encoding='utf-8')
# 创建日志记录的格式 日志等级 输入日志信息的文件名 行数 日志信息
formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]')
# 为刚创建的日志记录器设置日志记录格式
file_log_handler.setFormatter(formatter)
# 为全局的日志工具对象(flask app使用的)添加日志记录器
logging.getLogger().addHandler(file_log_handler)
# logging.basicConfig(handlers=[logging.FileHandler('logs/tiktok_test.py.log', 'a', 'utf-8')],
# format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
_logger = logging.getLogger(__name__)
# 只会运行的时候连接一次
db_handle = DbService().conn_engine
# odoo_conn = Order_dispose()
# common = YhjCommon()
# redis_client = RedisClient()
# redis_obj = redis_client.get_redis(config.REDIS_NAME, config.REDIS_HOST, config.REDIS_PORT, config.REDIS_DB)
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
def retry_call_with_args(func, max_retries=3, retry_delay=1, **kwargs):
for attempt in range(max_retries):
try:
res = func(**kwargs)
if not res.get('err_msg'):
return
else:
logging.warning(f"{func.__name__} 第 {attempt+1} 次执行失败,尝试重试")
except Exception as e:
logging.error(f"{func.__name__} 第 {attempt+1} 次抛出异常: {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
logging.error(f"{func.__name__} 最多重试 {max_retries} 次失败,放弃,数据:{kwargs}")
class Order_dispose(object):
def get_rf_to_time(self, rfc3339_string):
# 将RFC 3339时间字符串解析为datetime对象
dt = datetime.fromisoformat(rfc3339_string.replace('Z', '+00:00'))
# 如果需要将其转换为UTC时区,可以显式设置时区
dt_utc = dt.replace(tzinfo=timezone.utc)
# 输出结果
dt_utc = dt_utc.strftime("%Y-%m-%d %H:%M:%S")
print("Parsed datetime (UTC):", dt_utc)
return dt_utc
def tiktok_package_declare(self, **kw):
# 接收清关包裹信息
msg = ''
return_res = {
"all_result": True,
"failed_provider_order_ids": [],
"err_msg": {},
}
logistic_order_no = ''
data_text = ''
utc_time = ''
try:
push_interval_15_days = 15
tt_customer_id = None
# 查询过滤最近天数 查询默认客户
sql = "select key,value from ir_config_parameter where key='tk_push_interval_15_days' or key='tt_customer_id'"
result_arr = pd.read_sql(sql, con=db_handle)
for res in result_arr.itertuples():
if res.key == 'tk_push_interval_15_days':
push_interval_15_days = int(res.value)
elif res.key == 'tt_customer_id':
tt_customer_id = int(res.value)
# 查询默认小包状态
node_id = None
sql = "select id from cc_node where node_type='package' and is_default=True limit 1"
node_arr = pd.read_sql(sql, con=db_handle)
for node_res in node_arr.itertuples():
node_id = int(node_res.id)
parcel_arr = kw['packages']
date = datetime.now() - timedelta(days=int(push_interval_15_days))
create_package_arr = [] # 创建小包vals数据
delete_goods_ids = [] # 删除的小包商品
package_no_id_res = {} # 维护小包号和id的对应
create_package_no_arr = [] # 创建的小包号列表
package_goods_vals_arr = [] # 创建的小包商品vals数据
data_text = json.dumps(parcel_arr)
utc_time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
for package in parcel_arr:
try:
logistic_order_no = package.get('provider_order_id')
sql = "select id,bl_id,is_cancel from cc_ship_package where logistic_order_no=%s and create_date >= %s"
parcel_sql_result = pd.read_sql(sql, con=db_handle, params=(logistic_order_no, date))
parcel_id = ''
parcel_bl_id = ''
for parcel_res in parcel_sql_result.itertuples():
parcel_id = parcel_res.id
parcel_bl_id = parcel_res.bl_id
parcel_is_cancel = parcel_res.is_cancel
if (parcel_id and (not parcel_bl_id or
parcel_is_cancel)) or not parcel_id:
ship_package_vals = dict(create_date=utc_time, state=node_id,
is_cancel=False, cancel_reason='',
logistic_order_no=package.get(
'provider_order_id'),
tracking_no=package.get(
'tracking_no'),
customer_ref=package.get(
'declaretion_bill_id'),
internal_account_number="",
user_track_note=package.get(
'remark'),
company_code=package.get(
'entity_code'),
trade_no=package.get(
'order_no'),
# 需要将时间搓转换为时间
operation_time=datetime.fromtimestamp(
int(package.get('operate_time')) / 1000).strftime(
'%Y-%m-%d %H:%M:%S'),
big_package_no=package.get(
'big_bag_no'),
container_no=package.get(
'container_no'),
buyer_region=package.get(
'buyer_region'),
next_provider_name=package.get(
'next_provider_name'),
sender_name=package.get(
'sender_info').get('name'),
sender_vat_no=package.get(
'sender_info').get('shipping_tax_id'),
sender_phone=package.get(
'sender_info').get('phone'),
sender_country=package.get('sender_info').get('address').get(
'address_l0'),
sender_state=package.get('sender_info').get('address').get(
'address_l1'),
sender_city=package.get('sender_info').get(
'address').get('address_l2'),
sender_add_1=package.get('sender_info').get('address').get(
'address_l3'),
sender_add_2=package.get('sender_info').get('address').get(
'address_l4'),
sender_add_3=package.get('sender_info').get(
'address').get('details'),
sender_postcode=package.get(
'sender_info').get('postcode'),
receiver_name=package.get(
'receiver_info').get('name'),
receiver_phone=package.get(
'receiver_info').get('phone'),
receiver_postcode=package.get(
'receiver_info').get('postcode'),
receiver_add_1=package.get('receiver_info').get('address').get(
'address_l0'),
receiver_add_2=package.get('receiver_info').get('address').get(
'address_l1'),
receiver_add_3=package.get('receiver_info').get('address').get(
'address_l2'),
receiver_city=package.get('receiver_info').get('address').get(
'address_l3'),
receiver_county=package.get('receiver_info').get('address').get(
'address_l4'),
receiver_detailed_address=package.get(
'receiver_info').get('address').get('details'),
receiver_vat_no=package.get(
'receiver_info').get('tax_id'),
currency=package.get(
'currency'),
gross_weight=package.get(
'package').get('real_weight'),
weight_unit=package.get(
'package').get('weight_unit'),
total_value=package.get(
'value').get('goods_value'),
customer_id=tt_customer_id, is_sync=True) # 增加客户信息
if package.get('items') and len(package.get('items')) > 0:
good_id_arr = []
for item in package.get('items'):
item_id = item.get('item_id')
if item_id not in good_id_arr:
good_id_arr.append(item_id)
package_good = dict(create_date=utc_time, bl_line_id=logistic_order_no,
bl_id=False,
item_id=item_id,
sku_id=item.get(
'sku_id'),
item_name_cn=item.get(
'product_name_cn'),
item_name_en=item.get(
'product_name'),
export_hs_code=item.get(
'export_hscode'),
import_hs_code=item.get(
'import_hscode'),
weight=item.get(
'weight') or None,
quantity=item.get(
'qty'),
quantity_unit=item.get(
'unit'),
declare_price=item.get(
'unit_price') or None,
freight=item.get(
'shipping_fee') or None,
cod_amount=item.get(
'cod_fee') or None,
vat_rate=item.get(
'vat_rate') or None,
item_vat=item.get(
'vat_fee') or None,
origin_country=item.get(
'origin_country'),
item_type=item.get(
'item_type') or None,
item_total_price=item.get(
'unit_price') or None,
item_link=item.get(
'item_url'),
item_tax_status=item.get('tax_mark') or None, )
# _logger.info('package_good:%s' % package_good)
if package_good:
package_goods_vals_arr.append(package_good)
if not parcel_id:
# 创建小包
create_package_no_arr.append(logistic_order_no)
create_package_arr.append(ship_package_vals)
else:
# 删除小包商品
package_no_id_res[logistic_order_no] = (parcel_id, parcel_bl_id) # 小包号对应id
sql = 'select id from cc_package_good where bl_line_id = %s'
delete_good_result = pd.read_sql(sql, con=db_handle, params=(parcel_id,))
origin_goods_ids = delete_good_result['id'].tolist()
delete_goods_ids += origin_goods_ids
# 更新小包
update_sql = """
UPDATE cc_ship_package
SET
is_cancel = %s,
cancel_reason = %s,
logistic_order_no = %s,
tracking_no = %s,
customer_ref = %s,
internal_account_number = %s,
user_track_note = %s,
company_code = %s,
trade_no = %s,
operation_time = %s,
big_package_no = %s,
container_no = %s,
buyer_region = %s,
next_provider_name = %s,
sender_name = %s,
sender_vat_no = %s,
sender_phone = %s,
sender_country = %s,
sender_state = %s,
sender_city = %s,
sender_add_1 = %s,
sender_add_2 = %s,
sender_add_3 = %s,
sender_postcode = %s,
receiver_name = %s,
receiver_phone = %s,
receiver_postcode = %s,
receiver_add_1 = %s,
receiver_add_2 = %s,
receiver_add_3 = %s,
receiver_city = %s,
receiver_county = %s,
receiver_detailed_address = %s,
receiver_vat_no = %s,
currency = %s,
gross_weight = %s,
weight_unit = %s,
total_value = %s,
customer_id = %s
WHERE id = %s;
"""
# 构建参数列表
params = (
False, # is_cancel
'', # cancel_reason
ship_package_vals['logistic_order_no'],
ship_package_vals['tracking_no'],
ship_package_vals['customer_ref'],
'', # internal_account_number
ship_package_vals['user_track_note'],
ship_package_vals['company_code'],
ship_package_vals['trade_no'],
ship_package_vals['operation_time'],
ship_package_vals['big_package_no'],
ship_package_vals['container_no'],
ship_package_vals['buyer_region'],
ship_package_vals['next_provider_name'],
ship_package_vals['sender_name'],
ship_package_vals['sender_vat_no'],
ship_package_vals['sender_phone'],
ship_package_vals['sender_country'],
ship_package_vals['sender_state'],
ship_package_vals['sender_city'],
ship_package_vals['sender_add_1'],
ship_package_vals['sender_add_2'],
ship_package_vals['sender_add_3'],
ship_package_vals['sender_postcode'],
ship_package_vals['receiver_name'],
ship_package_vals['receiver_phone'],
ship_package_vals['receiver_postcode'],
ship_package_vals['receiver_add_1'],
ship_package_vals['receiver_add_2'],
ship_package_vals['receiver_add_3'],
ship_package_vals['receiver_city'],
ship_package_vals['receiver_county'],
ship_package_vals['receiver_detailed_address'],
ship_package_vals['receiver_vat_no'],
ship_package_vals['currency'],
ship_package_vals['gross_weight'],
ship_package_vals['weight_unit'],
ship_package_vals['total_value'],
tt_customer_id,
parcel_id # id
)
pd.read_sql(update_sql, con=db_handle, params=params, chunksize=100)
except Exception as err:
return_res['all_result'] = False
return_res['failed_provider_order_ids'].append(
package.get('provider_order_id'))
return_res['err_msg'].update(
{package.get('provider_order_id'): str(err)})
msg = str(err)
_logger.error('package_declare error:%s' %
str(err))
_logger.error('小包异常: %s' % kw)
# 创建小包
# print('create_package_arr', create_package_arr)
with db_handle.begin() as connect:
if create_package_arr:
val_df = pd.DataFrame(create_package_arr)
val_df.to_sql('cc_ship_package', con=connect, if_exists='append', index=False)
# 获取创建后 小包单号对应的id,bl_id
package_no_str = '(%s)' % str(create_package_no_arr)[1:-1]
sql = 'select id,logistic_order_no,bl_id from cc_ship_package where logistic_order_no in %s' % package_no_str
new_order_arr = pd.read_sql(sql, con=connect)
for new_order_data in new_order_arr.itertuples():
package_no_id_res[new_order_data.logistic_order_no] = (new_order_data.id, new_order_data.bl_id)
if delete_goods_ids: # 删除小包商品
ids = '(%s)' % str(delete_goods_ids)[1:-1]
delete_goods_sql = 'delete from cc_package_good where id in %s' % ids
pd.read_sql(delete_goods_sql, con=connect, chunksize=100)
if package_goods_vals_arr:
for package_goods_vals in package_goods_vals_arr:
logistic_order_no = package_goods_vals['bl_line_id']
data_list = package_no_id_res.get(logistic_order_no, [])
package_goods_vals['bl_line_id'] = data_list[0] if data_list else None
package_goods_vals['bl_id'] = data_list[1] if data_list else None
val_df = pd.DataFrame(package_goods_vals_arr)
val_df.to_sql('cc_package_good', con=connect, if_exists='append', index=False)
except Exception as err:
return_res['all_result'] = False
return_res['failed_provider_order_ids'].append(
logistic_order_no)
return_res['err_msg'].update(
{logistic_order_no: str(err)})
_logger.error('package_declare error:%s' %
str(err))
msg = str(err)
_logger.error('小包异常: %s' % kw)
val = {
'big_bag_no': logistic_order_no,
'error_msg': msg,
'push_time': utc_time,
'data_text': data_text,
'success_bl': return_res['all_result'],
'request_id': kw['request_id'],
'source': '推入',
'create_date': datetime.utcnow()
}
val_df = pd.DataFrame(val, index=[0])
val_df.to_sql('ao_tt_api_log', con=db_handle, if_exists='append', index=False)
return return_res
def mawb_declare(self, **kws):
# 接收提单信息、大包与小包的关联信息
return_res = {
"err_msg": '',
}
master_waybill_no = ''
data_text = ''
utc_time = ''
try:
push_interval_15_days = 15
tt_customer_id = None
# 查询过滤最近天数 查询默认客户
sql = "select key,value from ir_config_parameter where key='tk_push_interval_15_days' or key='tt_customer_id'"
result_arr = pd.read_sql(sql, con=db_handle)
for res in result_arr.itertuples():
if res.key == 'tk_push_interval_15_days':
push_interval_15_days = int(res.value)
elif res.key == 'tt_customer_id':
tt_customer_id = int(res.value)
# 查询默认提单状态
node_id = None
sql = "select id from cc_node where node_type='bl' and is_default=True limit 1"
node_arr = pd.read_sql(sql, con=db_handle)
for node_res in node_arr.itertuples():
node_id = int(node_res.id)
# 获取提单号 提单信息
master_waybill_no = kws['master_waybill_no']
mawb_info = kws.get('mwb_info')
date = datetime.now() - timedelta(days=int(push_interval_15_days))
data_text = json.dumps(kws)
utc_time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
declare_type = kws.get('declare_type')
if declare_type:
declare_type = declare_type.lower()
# 提单数据
bl_vals = dict(bl_no=master_waybill_no,
customs_bl_no=kws.get('customs_waybill_id'),
trade_type=kws.get('trade_type'),
big_package_qty=kws.get('big_bag_quantity'),
big_package_sell_country=kws.get(
'buyer_region'),
declare_type=kws.get('declare_type'),
transport_tool_code=mawb_info.get(
'transport_code'),
transport_tool_name=mawb_info.get(
'transport_name'),
start_port_code=mawb_info.get(
'depart_port_code'),
end_port_code=mawb_info.get('arrive_port_code'),
billing_weight=mawb_info.get(
'chargable_weight') or None,
actual_weight=mawb_info.get('real_weight') or None,
etd=mawb_info.get('etd'),
eta=mawb_info.get('eta'),
customer_id=tt_customer_id,
is_cancel=False,
customs_clearance_status=node_id, state='draft', create_date=utc_time, is_bl_sync=True, bl_date=datetime.now().strftime('%Y-%m-%d'),
cc_deadline=(datetime.now()+timedelta(days=5)).strftime('%Y-%m-%d'))
# big_package_qty
# 查询最近15天内取消的提单号
bl_id = ''
bl_state = ''
sql = "select id,state from cc_bl where bl_no=%s and is_cancel=False and create_date >= %s limit 1"
waybill_sql_result = pd.read_sql(sql, con=db_handle, params=(master_waybill_no, date))
for waybill_sql_res in waybill_sql_result.itertuples():
bl_id = waybill_sql_res.id
bl_state = waybill_sql_res.state
exit_bl_obj = []
if declare_type == 'create':
# 检查 如果类型是create 根据提单号和大包数量查询到了数据 就不做处理
select_bl_sql = """select id from cc_bl where bl_no='{0}' and big_package_qty={1};""".format(
kws.get('master_waybill_no'), kws.get('big_bag_quantity'))
select_sql_result = pd.read_sql(select_bl_sql, con=db_handle)
exit_bl_obj = select_sql_result
if not len(waybill_sql_result):
if declare_type == 'create':
val_df = pd.DataFrame(bl_vals, index=[0])
val_df.to_sql('cc_bl', con=db_handle, if_exists='append', index=False)
file_name_arr = ['主单', '货站提货POD',
'Manifest格式和数据(cvs/excel格式,系统目前不支持,线下提供或保留现有方式)',
'海关CDS申报单(import和授权方式检查拉齐等)', '尾程交接POD(待大包数量和箱号)']
sql = "select id,state from cc_bl where bl_no=%s limit 1"
new_bl_result = pd.read_sql(sql, con=db_handle, params=(master_waybill_no,))
# new_bl_id = ''
for new_bl_res in new_bl_result.itertuples():
bl_id = new_bl_res.id
if bl_id:
file_vals_arr = [{'file_name': i, 'bl_id': bl_id} for i in file_name_arr]
val_df = pd.DataFrame(file_vals_arr)
val_df.to_sql('cc_clearance_file', con=db_handle, if_exists='append', index=False)
else:
if declare_type == 'update' and bl_state == 'draft':
update_sql = """
UPDATE cc_bl
SET
customs_bl_no = %s,
trade_type = %s,
big_package_qty = %s,
big_package_sell_country = %s,
declare_type = %s,
transport_tool_code = %s,
transport_tool_name = %s,
start_port_code = %s,
end_port_code = %s,
billing_weight = %s,
actual_weight = %s,
etd = %s,
eta = %s,
customer_id = %s,
customs_clearance_status = %s
WHERE id = %s;
"""
params = (
kws.get('customs_waybill_id'),
kws.get('trade_type'),
kws.get('big_bag_quantity'),
kws.get('buyer_region'),
kws.get('declare_type'),
mawb_info.get('transport_code'),
mawb_info.get('transport_name'),
mawb_info.get('depart_port_code'),
mawb_info.get('arrive_port_code'),
mawb_info.get('chargable_weight') or None,
mawb_info.get('real_weight') or None,
mawb_info.get('etd'),
mawb_info.get('eta'),
tt_customer_id,
node_id,
bl_id # WHERE条件的值
)
pd.read_sql(update_sql, con=db_handle, params=params, chunksize=100)
if bl_id and ((declare_type == 'create' and len(exit_bl_obj) <= 0) or (
declare_type == 'update' and bl_state == 'draft')):
big_bag_list = kws.get('big_bag_list')
if big_bag_list and len(big_bag_list) > 0:
# 删除提单关联的所有大包 100多条大包数据
delete_big_sql = "delete from cc_big_package where bl_id=%s"
pd.read_sql(delete_big_sql, con=db_handle, params=(bl_id,), chunksize=100)
# 更新提单所有小包
update_package_sql = "update cc_ship_package set bl_id=null,big_package_id=null where bl_id=%s"
pd.read_sql(update_package_sql, con=db_handle, params=(bl_id,), chunksize=100)
# 批量该提单创建所有大包
big_bag_vals_arr = [
{
'bl_id': bl_id,
'big_package_no': big_bag.get(
'big_bag_no'),
'next_provider_name': big_bag.get('next_provider_name'),
'create_date': utc_time,
'tally_state': 'unprocessed_goods'
} for big_bag in big_bag_list
]
val_df = pd.DataFrame(big_bag_vals_arr)
val_df.to_sql('cc_big_package', con=db_handle, if_exists='append', index=False)
# 批量查询该提单所有大包信息
big_bag_no_list = [big_bag.get('big_bag_no') for big_bag in big_bag_list]
big_bag_no_placeholder = ', '.join(['%s'] * len(big_bag_no_list))
big_sql = f"SELECT id, big_package_no FROM cc_big_package WHERE big_package_no IN ({big_bag_no_placeholder}) AND create_date >= %s"
big_package_results = pd.read_sql(big_sql, con=db_handle, params=big_bag_no_list + [date])
big_package_dict = {row['big_package_no']: row['id'] for row in
big_package_results.to_dict(orient='records')}
for big_bag in big_bag_list:
big_bag_no = big_bag.get('big_bag_no')
package_list = big_bag.get(
'package_list', [])
big_package_id = big_package_dict.get(big_bag_no)
# 检查big_bag_no是否已经存在
# big_package_vals = dict(bl_id=bl_id,
# big_package_no=big_bag.get(
# 'big_bag_no'),
# next_provider_name=big_bag.get('next_provider_name'), create_date=utc_time)
# big_sql = "select id from cc_big_package where big_package_no=%s and create_date >= %s limit 1"
# big_package_id = ''
# big_sql_result = pd.read_sql(big_sql, con=db_handle, params=(big_bag_no, date))
# for big_sql_res in big_sql_result.itertuples():
# big_package_id = big_sql_res.id
# if not big_package_id:
# val_df = pd.DataFrame(big_package_vals, index=[0])
# val_df.to_sql('cc_big_package', con=db_handle, if_exists='append', index=False)
# sql = "select id from cc_big_package where big_package_no=%s and bl_id=%s and create_date >= %s limit 1"
# new_big_result = pd.read_sql(sql, con=db_handle, params=(big_bag.get(
# 'big_bag_no'), bl_id, date))
# # new_bl_id = ''
# for new_big_res in new_big_result.itertuples():
# big_package_id = new_big_res.id
# else:
# update_big_sql = "update cc_big_package set bl_id=%s,big_package_no=%s,next_provider_name=%s where id=%s"
# pd.read_sql(update_big_sql, con=db_handle, params=(bl_id, big_bag.get(
# 'big_bag_no'), big_bag.get(
# 'next_provider_name'), big_package_id), chunksize=100)
# 生成cc.ship.package
if package_list and len(package_list) > 0:
package_ids = [package.get(
'provider_order_id') for package in package_list] # 传来的大包下的小包列表
# print(tuple(package_ids))
package_ids_str = '(%s)' % str(package_ids)[1:-1]
package_sql = "select id,logistic_order_no from cc_ship_package where logistic_order_no in {} and create_date >= %s".format(package_ids_str)
# print(package_sql)
package_sql_result = pd.read_sql(package_sql, con=db_handle, params=(date,))
if len(package_sql_result) > 0:
package_id_arr = package_sql_result['id'].tolist()
ids_str = '(%s)' % str(package_id_arr)[1:-1]
update_ship_sql = "update cc_ship_package set is_cancel=False,cancel_reason='',big_package_id=%s,bl_id=%s where id in {0}".format(ids_str)
pd.read_sql(update_ship_sql, con=db_handle, params=(big_package_id, bl_id), chunksize=100)
# 更新小包商品关联id
update_ship_sql = "update cc_package_good set big_package_id=%s,bl_id=%s where bl_line_id in {0}".format(
ids_str)
pd.read_sql(update_ship_sql, con=db_handle, params=(big_package_id, bl_id),
chunksize=100)
if len(package_ids) != len(package_sql_result):
# 找出ship_packages没有找到的package
logistic_order_no_arr = package_sql_result['logistic_order_no'].tolist()
package_ids = set(package_ids)
ship_package_ids = set(logistic_order_no_arr)
diff_ids = package_ids - ship_package_ids
_logger.info('diff_ids:%s' % diff_ids)
else:
return_res['err_msg'] = 'Big bag [%s] not include any package. ' % big_bag.get(
'big_bag_no')
else:
return_res['err_msg'] = 'Big bag list is empty.'
if not return_res['err_msg']:
# 根据商品分组 更新提单大包数量 小包数量 商品数量
update_bl_info_sql = """
UPDATE cc_bl SET bl_total_big_qty = s.big_num, bl_total_qty=s.good_num, bl_ship_package_qty=s.ship_num from (
SELECT bl_id, COUNT(DISTINCT big_package_id) as big_num , COUNT(DISTINCT bl_line_id) as ship_num, COUNT(*) AS good_num
FROM cc_package_good
WHERE bl_id = {0}
GROUP BY bl_id
) s
where id = s.bl_id
;
""".format(bl_id)
pd.read_sql(update_bl_info_sql, con=db_handle, chunksize=100)
# 更新提单下的大包的 小包数量 商品数量
sql = "select id from cc_big_package where bl_id=%s"
big_sql_result = pd.read_sql(sql, con=db_handle, params=(bl_id,))
big_sql_result_ids = big_sql_result['id'].tolist()
big_ids_str = '(%s)' % str(big_sql_result_ids)[1:-1]
update_big_info_sql = """
UPDATE cc_big_package SET ship_package_qty = s.ship_num, goods_qty=s.good_num from (
SELECT big_package_id, COUNT(DISTINCT bl_line_id) as ship_num, COUNT(*) AS good_num
FROM cc_package_good
WHERE big_package_id IN {0}
GROUP BY big_package_id
) s
where id = s.big_package_id
;
""".format(big_ids_str)
pd.read_sql(update_big_info_sql, con=db_handle,
chunksize=100)
# 更新提单总金额
update_bl_amount_sql = """
UPDATE cc_bl
SET bl_total_amount = (
SELECT SUM(total_value)
FROM cc_ship_package
WHERE bl_id = %s
)
WHERE id = %s;
"""
pd.read_sql(update_bl_amount_sql, con=db_handle, params=(bl_id, bl_id,), chunksize=100)
# 更新提单的尾程服务商 - 简化高效版本
try:
_logger.info(f"开始更新提单 {bl_id} 的尾程服务商")
# 先清空现有关联
clear_sql = "DELETE FROM cc_bill_loading_last_mile_provider_rel WHERE bl_id = %s"
pd.read_sql(clear_sql, con=db_handle, params=(bl_id,), chunksize=100)
# 获取提单下所有大包的下一个快递名称
get_names_sql = """
SELECT DISTINCT LOWER(TRIM(bp.next_provider_name)) as provider_name
FROM cc_big_package bp
WHERE bp.bl_id = %s
AND bp.next_provider_name IS NOT NULL
AND bp.next_provider_name != ''
"""
names_result = pd.read_sql(get_names_sql, con=db_handle, params=(bl_id,))
_logger.info(f"提单 {bl_id} 找到 {len(names_result)} 个快递名称")
if not names_result.empty:
# 获取所有尾程快递的匹配值
get_providers_sql = """
SELECT id, matching_value
FROM cc_last_mile_provider
WHERE matching_value IS NOT NULL AND matching_value != ''
"""
providers_result = pd.read_sql(get_providers_sql, con=db_handle)
_logger.info(f"找到 {len(providers_result)} 个尾程快递配置")
# 构建匹配映射
provider_mapping = {}
for _, provider_row in providers_result.iterrows():
provider_id = provider_row['id']
matching_values = [value.lower().strip() for value in provider_row['matching_value'].split('\n') if value.strip()]
for value in matching_values:
if value not in provider_mapping:
provider_mapping[value] = []
provider_mapping[value].append(provider_id)
# 查找匹配的provider
matched_provider_ids = set()
for _, name_row in names_result.iterrows():
provider_name = name_row['provider_name']
if provider_name in provider_mapping:
matched_provider_ids.update(provider_mapping[provider_name])
_logger.info(f"匹配到快递名称: {provider_name} -> {provider_mapping[provider_name]}")
# 批量插入匹配结果
if matched_provider_ids:
insert_data = [(bl_id, provider_id) for provider_id in matched_provider_ids]
insert_df = pd.DataFrame(insert_data, columns=['bl_id', 'last_mile_provider_id'])
# 使用事务确保数据一致性
with db_handle.begin() as conn:
insert_df.to_sql('cc_bill_loading_last_mile_provider_rel', con=conn,
if_exists='append', index=False, method='multi')
_logger.info(f"成功插入 {len(matched_provider_ids)} 个尾程服务商关联")
else:
_logger.warning(f"提单 {bl_id} 没有匹配到任何尾程服务商")
else:
_logger.warning(f"提单 {bl_id} 没有找到任何快递名称")
except Exception as e:
_logger.error(f"更新尾程服务商失败: {str(e)}")
_logger.error(f"提单ID: {bl_id}")
# 不抛出异常,继续执行其他逻辑
except Exception as err:
return_res['err_msg'] = str(err)
_logger.error('mawb_declare error:%s' %
str(err))
_logger.error('提单异常: %s' % kws)
val = {
'big_bag_no': '提单下发: %s' % master_waybill_no,
'error_msg': return_res['err_msg'],
'push_time': utc_time,
'data_text': data_text,
'success_bl': False if return_res['err_msg'] else True,
'request_id': kws['request_id'],
'source': '推入',
'create_date': datetime.utcnow()
}
val_df = pd.DataFrame(val, index=[0])
val_df.to_sql('ao_tt_api_log', con=db_handle, if_exists='append', index=False)
# return_res
return return_res
try:
pool = redis.ConnectionPool(**config.redis_options)
r = redis.Redis(connection_pool=pool)
logging.info(u'redis连接成功')
Order_dispose = Order_dispose()
while 1:
try:
result = r.brpop(['tiktok_parcel_data'], 0)
# print(result[1])
data1 = result[1]
task_data = json.loads(data1)
# response_data = Order_dispose.tiktok_package_declare(data1)
# 根据类型进行分发是 处理小包还是提单tiktok_package_declare
data_type = task_data.get('type')
handle_data = task_data['result']
if data_type == 'package':
executor.submit(retry_call_with_args, Order_dispose.tiktok_package_declare, 3, 1, **handle_data)
elif data_type == 'mawb':
executor.submit(retry_call_with_args, Order_dispose.mawb_declare, 3, 1, **handle_data)
else:
logging.error('未找到数据类型')
except Exception as e:
logging.error('error: %s' % str(e))
continue
except Exception as e:
logging.error("登录失败:%s" % e)
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论