提交 50fbbe32 authored 作者: 贺阳's avatar 贺阳

队列文件删除

上级 7744efbc
# coding=utf-8
# 本地
db_ip = "127.0.0.1"
db_port = "8069"
db_name = "qg"
db_user = odoo
db_password = 111025
redis_options = dict(
host='127.0.0.1',
port=6379,
# password='topodoo1314',
decode_responses=True,
db=0
)
postgresql_options = dict(
host="127.0.0.1",
port=5432,
database="ao",
user="odoo",
password="odoo",
)
# 测试
# db_ip = "121.199.167.133"
# db_port = "8369"
# db_name = "airorder0309"
# db_user = "admin"
# db_password = "123123"
#
# redis_options = dict(
# host='172.18.0.6',
# port=6379,
# # password='top123',
# decode_responses=True,
# db=3
# )
# postgresql_options = dict(
# host="172.18.0.2",
# port=5432,
# database="airorder0309",
# user="ao",
# password="ao",
# )
# product
# db_ip = "172.18.183.214"
# db_port = "9169"
# db_name = "air_order"
# db_user = "admin"
# db_password = "YHB1408ups"
#
# redis_options = dict(
# host='172.18.183.214',
# port=32768,
# password='top123',
# decode_responses=True,
# db=3
# )
# postgresql_options = dict(
# host="pgm-wz94v126235u6syw.pg.rds.aliyuncs.com",
# port="3433",
# database="air_order",
# user="ao",
# password="ao888123../",
# )
#
# coding=utf-8
import json
import logging
import redis
import time
import requests
import odoorpc
from requests.adapters import HTTPAdapter
import config
from util import AoUtil
# 默认字符gbk
# logging.basicConfig(filename='./push_data_logger.log', level=logging.INFO)
# 设置文件字符为utf-8
logging.basicConfig(handlers=[logging.FileHandler('logs/flight_order_logger.log', 'a', 'utf-8')],
format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
common = AoUtil()
class Order_dispose(object):
def __init__(self):
# rpc连接
self.odoo_db = odoorpc.ODOO(config.db_ip, port=config.db_port)
self.odoo_db.login(config.db_name, config.db_user, config.db_password)
def order_data(self, data):
res_data = []
try:
logging.info('flight_order')
s = requests.Session()
# s.mount('http://', HTTPAdapter(max_retries=3))
s.mount('https://', HTTPAdapter(max_retries=2)) # requests自带重试机制,重试2次,共请求3次
if data['action'] == 'post':
response = s.post(url=data['url'], data=json.dumps(data['data']), headers=data['headers']
, timeout=(180, 180))
elif data['action'] == 'put':
response = s.put(url=data['url'], data=json.dumps(data['data']), headers=data['headers']
, timeout=(180, 180))
else:
response = s.delete(url=data['url'], data=json.dumps(data['data']), headers=data['headers']
, timeout=(180, 180))
logging.info('flight_order_get_response')
can_json = common.check_json(response)
if can_json:
res = response.json()
if res['code'] == 30200:
val['success_bl'] = True
insert_sql="""insert into ao_tt_api_log(big_bag_no,push_time,error_msg,success_bl,data_text,request_id,source) values ();"""
ship_insert_sql="""insert into cc_ship_package_sync_log(package_id,sync_time,api_customer,process_code,operate_time,operate_remark,operate_user) values ();"""
data = self.get_callback_track_data()
tt_api_obj = self.env["ao.tt.api"].sudo()
response = tt_api_obj.callback_track(data)
logging.info('callback_track response:%s' % response)
response_data = response.json()
if response_data['code'] != 0:
self.is_sync = False
error_msg = response_data['msg']
request_id = response_data['requestID']
code = response_data['code']
self.env['ao.tt.api.log'].sudo().create_api_log(self.tracking_no or '',
'小包状态轨迹回传:' + error_msg,
data,
code,
request_id, source='推出')
return error_msg
else:
# 回传成功
self.is_sync = True
self.env['cc.ship.package.sync.log'].sudo().create_sync_log(self.id, 'Tiktok',
self.state.tk_code,
self.state_explain,
self.process_time.strftime(
'%Y-%m-%d %H:%M:%S'))
request_id = response_data['requestID']
self.env['ao.tt.api.log'].sudo().create_api_log(self.tracking_no or '', '', data, 0, request_id,
source='推出')
return ''
# 创建提单同步创建时间节点
if data.get('need_return') and data['action'] == 'post':
self.odoo_db.execute("yhj.api", "time_node_create", ["self"], data['change_state'],
data['model_ids'])
elif data.get('need_return') and res['code'] == 30204 and data['action'] == 'put':
# 修改提单同步创建时间节点
self.odoo_db.execute("yhj.api", "flight_order_create", ["self"], data['change_state'],
data['model_ids'])
self.odoo_db.execute("yhj.api", "time_node_create", ["self"], data['change_state'],
data['model_ids'])
else:
val['success_bl'] = False
val['error_msg'] = res.get('message') or ''
val['remark'] = res.get('fail_waybills') or ''
res_data = val
else:
logging.info('response text:%s' % response.text)
except Exception as ex:
logging.error('flight_order push error:%s' % str(ex))
return res_data
try:
pool = redis.ConnectionPool(**config.redis_options)
r = redis.Redis(connection_pool=pool)
logging.info(u'redis连接成功')
Order_dispose = Order_dispose()
while 1:
try:
result = r.brpop('flight_order_list', 0)
data1 = json.loads(result[1])
logging.info(data1)
response_data = Order_dispose.order_data(data1)
if response_data:
logging.info('推送数据...%s', response_data)
r.lpush('yhj_api_log_response_data', json.dumps(response_data))
except Exception as e:
logging.error(e)
continue
except Exception as e:
logging.error("登录失败")
logging.error(e)
This source diff could not be displayed because it is too large. You can view the blob instead.
差异被折叠。
This source diff could not be displayed because it is too large. You can view the blob instead.
import psycopg2
import config
import datetime
import logging
import json
import redis
from util import AoUtil
logging.basicConfig(handlers=[logging.FileHandler('logs/push_data_logger.log', 'a', 'utf-8')],
format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
common = AoUtil()
class Database_Deal():
def push_data(self, data, pg_conn):
try:
logging.info('push log data:%s' % data)
cursor = pg_conn.cursor()
sql = "insert into ao_tt_api_log(big_bag_no,push_time,error_msg,success_bl,data_text,request_id,source) values('%s','%s','%s','%s','%s','%s','%s','%s','未处理')" % (
data['model_name'], data['model_description'], data['model_id'], str(datetime.datetime.utcnow()),
data['trigger_description'], data['success_bl'],
'' if not data.get('error_msg') else json.dumps(data['error_msg'], ensure_ascii=False),
'' if not data.get('remark') else json.dumps(data['remark'], ensure_ascii=False))
cursor.execute(sql)
# 航班起飞发货运单标记已推送到yhj,单个发货不再推送
if data.get('ship_package_objs'):
sql2 = "update cc_ship_package set is_sync=true where id in %s;" % eval(data['ship_package_objs'])
cursor.execute(sql2)
pg_conn.commit()
except Exception as ex:
logging.error('push_data error:%s' % str(ex))
try:
pool = redis.ConnectionPool(**config.redis_options)
r = redis.Redis(connection_pool=pool)
logging.info('redis连接成功')
pg_conn = psycopg2.connect(**config.postgresql_options)
logging.info('postgresql连接成功')
DD = Database_Deal()
while 1:
try:
result = r.brpop('yhj_api_log_response_data', 0)
data1 = json.loads(result[1])
logging.info(data1)
response_data = DD.push_data(data1, pg_conn)
except Exception as e:
logging.error('队列error' % e)
continue
except Exception as e:
logging.error("登录失败")
logging.error(e)
requests
peewee
pycrypto
pycryptodome
redis
APScheduler
psycopg2
\ No newline at end of file
FROM coding-public-docker.pkg.coding.net/public/docker/python:3.8
RUN mkdir /etc/supervisor
ADD . /mnt/extra-addons
RUN pip3 install -r /mnt/extra-addons/requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
RUN mkdir -p /var/log/supervisor/
RUN pip install supervisor -i https://mirrors.aliyun.com/pypi/simple/
ADD ./supervisord_conf/supervisord.conf /etc/supervisor/
# 同步数据库
WORKDIR /mnt/extra-addons
EXPOSE 9001
CMD ["supervisord","-n","-c","/etc/supervisor/supervisord.conf"]
[program:flight_order_consumer_1]
process_name=%(program_name)s_%(process_num)02d ; 进程名称
directory = /mnt/extra-addons ; 程序的启动目录
command = /usr/bin/python3 /mnt/extra-addons/flight_order_consumer.py ; 启动命令
autostart = true ; 在 supervisord 启动的时候也自动启动
startsecs = 5 ; 启动 5 秒后没有异常退出,就当作已经正常启动了
autorestart = true ; 程序异常退出后自动重启
startretries = 3 ; 启动失败自动重试次数,默认是 3
user = root ; 用哪个用户启动
numprocs=1 ; 进程数
redirect_stderr = true ; 把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes = 20MB ; stdout 日志文件大小,默认 50MB
stdout_logfile_backups = 20 ; stdout 日志文件备份数
; stdout 日志文件,需要注意当指定目录不存在时无法正常启动,所以需要手动创建目录(supervisord 会自动创建日志文件)
stdout_logfile = /var/log/supervisor/flight_order_consumer.log
[program:push_data_consumer_1]
process_name=%(program_name)s_%(process_num)02d ; 进程名称
directory = /mnt/extra-addons ; 程序的启动目录
command = /usr/bin/python3 /mnt/extra-addons/push_data_consumer.py ; 启动命令
autostart = true ; 在 supervisord 启动的时候也自动启动
startsecs = 5 ; 启动 5 秒后没有异常退出,就当作已经正常启动了
autorestart = true ; 程序异常退出后自动重启
startretries = 3 ; 启动失败自动重试次数,默认是 3
user = root ; 用哪个用户启动
numprocs=1 ; 进程数
redirect_stderr = true ; 把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes = 20MB ; stdout 日志文件大小,默认 50MB
stdout_logfile_backups = 20 ; stdout 日志文件备份数
; stdout 日志文件,需要注意当指定目录不存在时无法正常启动,所以需要手动创建目录(supervisord 会自动创建日志文件)
stdout_logfile = /var/log/supervisor/push_data_consumer.log
[program:time_node_consumer_1]
process_name=%(program_name)s_%(process_num)02d ; 进程名称
directory = /mnt/extra-addons ; 程序的启动目录
command = /usr/bin/python3 /mnt/extra-addons/time_node_consumer.py ; 启动命令
autostart = true ; 在 supervisord 启动的时候也自动启动
startsecs = 5 ; 启动 5 秒后没有异常退出,就当作已经正常启动了
autorestart = true ; 程序异常退出后自动重启
startretries = 3 ; 启动失败自动重试次数,默认是 3
user = root ; 用哪个用户启动
numprocs=1 ; 进程数
redirect_stderr = true ; 把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes = 20MB ; stdout 日志文件大小,默认 50MB
stdout_logfile_backups = 20 ; stdout 日志文件备份数
; stdout 日志文件,需要注意当指定目录不存在时无法正常启动,所以需要手动创建目录(supervisord 会自动创建日志文件)
stdout_logfile = /var/log/supervisor/time_node_consumer.log
; supervisor config file
[unix_http_server]
file=/var/run/supervisor.sock ; (the path to the socket file)
chmod=0700 ; sockef file mode (default 0700)
[supervisord]
nodaemon=true
logfile=/var/log/supervisor/supervisord.log ; (main log file;default $CWD/supervisord.log)
pidfile=/var/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
childlogdir=/var/log/supervisor ; ('AUTO' child log dir, default $TEMP)
; the below section must remain in the config file for RPC
; (supervisorctl/web interface) to work, additional interfaces may be
; added by defining them in separate rpcinterface: sections
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL for a unix socket
; The [include] section can just contain the "files" setting. This
; setting can list multiple files (separated by whitespace or
; newlines). It can also contain wildcards. The filenames are
; interpreted as relative to this file. Included files *cannot*
; include files themselves.
[inet_http_server]
port=9001
username=admin
password=admin
[include]
files = /mnt/extra-addons/supervisord_conf/conf.d/*.conf
# coding=utf-8
import json
import logging
import redis
import time
import requests
from requests.adapters import HTTPAdapter
import config
from util import AoUtil
# 默认字符gbk
# logging.basicConfig(filename='./push_data_logger.log', level=logging.INFO)
# 设置文件字符为utf-8
logging.basicConfig(handlers=[logging.FileHandler('logs/time_node_logger.log', 'a', 'utf-8')],
format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
common = AoUtil()
class Order_dispose(object):
def order_data(self, data, r):
res_data = []
try:
logging.info('time_node')
s = requests.Session()
# s.mount('http://', HTTPAdapter(max_retries=3))
s.mount('https://', HTTPAdapter(max_retries=2)) # requests自带重试机制,重试2次,共请求3次
# timeout第一个参数是连接超时,第二个是读取超时(返回数据)
if data['action'] == 'delete':
response = s.delete(url=data['node_url'], data=json.dumps(data['node_data']),
headers=data['headers'], timeout=(180, 180))
else:
response = s.post(url=data['node_url'], data=json.dumps(data['node_data']),
headers=data['headers'], timeout=(180, 180))
logging.info('time_node_get_response')
can_json = common.check_json(response)
if can_json:
res = response.json()
logging.info('res:%s' % res)
val = {'model_name': 'ao.flight.order', 'model_description': '航空运单(时间节点)',
'trigger_description': data['trigger_description'], 'model_id': data['model_ids']}
if res['code'] == 31200:
val['success_bl'] = True
# 航班起飞发货运单标记已推送到yhj,单个发货不再推送
if data['change_state'] == '航班起飞':
val['waybill_ids'] = data['model_ids']
elif res['code'] == 31205 and data['action'] == 'post' and data.get('request_num') and data[
'request_num'] < 4:
# 3次创建时间节点 在这期间提单可能创建成功
data['request_num'] += 1
r.lpush('time_node_list', json.dumps(data))
return []
else:
val['success_bl'] = False
val['error_msg'] = res.get('message') or ''
val['remark'] = res.get('fail_nodes') or ''
res_data = val
else:
logging.info('response text:%s' % response.text)
except Exception as ex:
logging.error('time_node push error:%s' % str(ex))
return res_data
try:
pool = redis.ConnectionPool(**config.redis_options)
r = redis.Redis(connection_pool=pool)
logging.info(u'redis连接成功')
Order_dispose = Order_dispose()
while 1:
try:
result = r.brpop('time_node_list', 0)
data1 = json.loads(result[1])
logging.info(data1)
response_data = Order_dispose.order_data(data1, r)
if response_data:
logging.info('推送数据...%s', response_data)
r.lpush('yhj_api_log_response_data', json.dumps(response_data))
except Exception as e:
logging.error(e)
continue
except Exception as e:
logging.error("登录失败")
logging.error(e)
# -*- coding: utf-8 -*-
import json
import random
import datetime
from Crypto.Cipher import AES
import logging
IV_SIZE = 16
class AoUtil(object):
"""
common
"""
def add_bytes_count(self, data):
count = len(data.encode('utf-8'))
if count % IV_SIZE != 0:
add = IV_SIZE - (count % IV_SIZE)
else:
add = 16
return add
def aes_encrypt(self, s_key, iv, pt):
"""
:param m_key: ref to class Mkey in tde_client.py
:param pt: plain text to encrypt
:return: encrypted value in byte form
"""
add = self.add_bytes_count(pt)
data = pt + chr(IV_SIZE - len(pt.encode('utf-8')) % IV_SIZE) * add
crypto = AES.new(s_key, AES.MODE_CBC, iv)
encrypt_aes = crypto.encrypt(data.encode('utf-8'))
return encrypt_aes
def cryptData(self, data, secretKey):
data = json.dumps(data)
encrypt_aes = self.aes_encrypt(secretKey[0:16].encode("utf8"), secretKey[16:].encode("utf8"), data)
return encrypt_aes.hex()
def random_str(self, num=6):
uln = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
rs = random.sample(uln, num) # 生成一个 指定位数的随机字符串
return ''.join(rs)
def get_odoo_time(self, parse_time):
"""
把时间减去8小时
:return:
"""
dt = datetime.datetime.strptime(parse_time, "%Y-%m-%d %H:%M:%S")
d = dt - datetime.timedelta(hours=8)
nTime = d.strftime("%Y-%m-%d %H:%M:%S")
return nTime
def check_json(self, input_str):
try:
input_str.json()
return True
except Exception as e:
logging.info(e)
return False
# -*- coding: utf-8 -*-
# Part of SmartGo. See LICENSE file for full copyright and licensing details.
import json
import logging
import psycopg2
import re
import requests
import time
from datetime import datetime, timedelta
from odoo import models, fields, api, tools
from odoo.exceptions import ValidationError, Warning
_logger = logging.getLogger(__name__)
class HHApi(models.Model):
_name = "hh.api"
_description = 'yhj接口类'
# 推送小包包裹
def ship_package_track(self, ship_package_objs, user_id=False):
try:
# 连接redis
# redis_conn = self.env['ao.common'].sudo().get_redis()
# if redis_conn == 'no':
# raise ValidationError('请先配置redis')
for ship in ship_package_objs:
data = ship.get_callback_track_data()
tt_api_obj = self.env["ao.tt.api"].sudo()
response = tt_api_obj.callback_track(data)
logging.info('callback_track response:%s' % response)
response_data = response.json()
# redis_conn.lpush('flight_order_list', json.dumps(vals))
pg_conn = psycopg2.connect(**config.postgresql_options)
cursor = pg_conn.cursor()
if response_data['code'] != 0:
sql2 = "update cc_ship_package set is_sync=false where id = {0};".format(ship.id)
cursor.execute(sql2)
error_msg = response_data['msg']
request_id = response_data['requestID']
# track_vals = {
# 'big_bag_no': big_bag_no,
# 'error_msg': error_msg if error_msg and error_msg != 'success' else '',
# 'push_time': datetime.utcnow(),
# 'data_text': data_text,
# 'success_bl': success_bl,
# 'request_id': request_id,
# 'source': '推出'
# }
insert_sql = """insert into ao_tt_api_log(big_bag_no,push_time,error_msg,success_bl,data_text,request_id,source) values ();""".format(
self.tracking_no or '', datetime.utcnow(), '小包状态轨迹回传:' + error_msg, data, False,
request_id, '推出')
cursor.execute(insert_sql)
pg_conn.commit()
else:
# 回传成功
sql2 = "update cc_ship_package set is_sync=true where id = {0};".format(ship.id)
cursor.execute(sql2)
ship_insert_sql = """insert into cc_ship_package_sync_log(package_id,sync_time,api_customer,process_code,operate_time,operate_remark,operate_user) values ();""".format(
self.id, self.process_time.strftime(
'%Y-%m-%d %H:%M:%S'), 'Tiktok',
ship.state.tk_code, fields.Datetime.now(),
ship.state_explain, user_id)
cursor.execute(ship_insert_sql)
pg_conn.commit()
request_id = response_data['requestID']
insert_sql = """insert into ao_tt_api_log(big_bag_no,push_time,error_msg,success_bl,data_text,request_id,source) values ();""".format(
self.tracking_no or '', datetime.utcnow(), '', data, True, request_id, '推出')
cursor.execute(insert_sql)
pg_conn.commit()
except Exception as ex:
_logger.error('包裹回传接口返回错误:%s' % str(ex))
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论