提交 c671cc29 authored 作者: 刘擎阳's avatar 刘擎阳

1.历史数据同步

上级 5e94616d
......@@ -24,8 +24,8 @@
'wizard/add_exception_info_wizard_views.xml',
'wizard/email_template.xml',
'data/data.xml',
'data/timer.xml',
'data/sequence.xml',
'views/menu_view.xml',
'views/cc_clearance_file_view.xml',
'views/cc_node_view.xml',
'views/cc_last_mile_provider_views.xml',
......@@ -41,6 +41,7 @@
'views/cc_history_package_good_view.xml',
'views/cc_history_ship_package_view.xml',
'views/cc_history_package_sync_log_view.xml',
'views/menu_view.xml',
# 'views/cc_customers_declaration_order_view.xml',
'templates/login.xml',
],
......
<odoo>
<data>
<record id="cron_update_history_data" model="ir.cron">
<field name="name">同步历史数据</field>
<field name="model_id" ref="ccs_base.model_cc_bl"/>
<field name="state">code</field>
<field name="code">model.cron_update_history_data()</field>
<field name='interval_number'>1</field>
<field name='interval_type'>days</field>
<field name="numbercall">-1</field>
<field name="active" eval="False"/>
</record>
</data>
</odoo>
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -2,7 +2,7 @@
# 导入日志
import logging
from datetime import timedelta, datetime
import json
import pytz
from odoo import models, fields, api, _
from odoo.exceptions import UserError, ValidationError
......@@ -634,156 +634,17 @@ class CcBL(models.Model):
item.big_package_ids.filtered(
lambda package: package.tally_state == 'handover_completed' and not package.is_cancel))
def history_func(self):
"""
处理历史单据
:return:
"""
# 获取当前日期
current_date = datetime.today()
# 计算180天之前的日期
past_date = current_date - timedelta(days=180)
bl_objs = self.env['cc.bl'].sudo().search([('is_history', '=', False), ('create_date', '<', past_date)],
limit=1)
print(bl_objs)
for bl_obj in bl_objs:
for big_package_obj in bl_obj.big_package_ids:
create_big_obj = self.env['cc.history.big.package'].sudo().create({
'origin_id': big_package_obj.id,
'bl_id': big_package_obj.bl_id.id,
'big_package_no': big_package_obj.big_package_no,
'next_provider_name': big_package_obj.next_provider_name,
'ship_package_qty': big_package_obj.ship_package_qty,
'goods_qty': big_package_obj.goods_qty,
'pallet_number': big_package_obj.pallet_number,
'pallet_usage_date': big_package_obj.pallet_usage_date,
'is_cancel': big_package_obj.is_cancel,
'tally_state': big_package_obj.tally_state,
'tally_user_id': big_package_obj.tally_user_id.id,
'tally_time': big_package_obj.tally_time,
'delivery_user_id': big_package_obj.delivery_user_id.id,
'delivery_time': big_package_obj.delivery_time,
'exception_info_ids': big_package_obj.exception_info_ids,
# exception_info_ids
# goods_ids
# ship_package_ids
})
for ship_package_obj in big_package_obj.ship_package_ids:
create_ship_obj = self.env['cc.history.ship.package'].sudo().create({
'customer_id': ship_package_obj.customer_id.id,
'origin_id': ship_package_obj.id,
'bl_id': ship_package_obj.bl_id.id,
'big_package_id': create_big_obj.id,
'logistic_order_no': ship_package_obj.logistic_order_no,
'tracking_no': ship_package_obj.logistic_order_no,
'customer_ref': ship_package_obj.customer_ref,
'internal_account_number': ship_package_obj.internal_account_number,
'user_track_note': ship_package_obj.user_track_note,
'company_code': ship_package_obj.company_code,
'trade_no': ship_package_obj.trade_no,
'big_package_no': ship_package_obj.big_package_no,
'container_no': ship_package_obj.container_no,
'buyer_region': ship_package_obj.buyer_region,
'next_provider_name': ship_package_obj.next_provider_name,
'sender_name': ship_package_obj.sender_name,
'sender_vat_no': ship_package_obj.sender_vat_no,
'sender_phone': ship_package_obj.sender_phone,
'sender_add_1': ship_package_obj.sender_add_1,
'sender_add_2': ship_package_obj.sender_add_2,
'sender_add_3': ship_package_obj.sender_add_3,
'sender_city': ship_package_obj.sender_city,
'sender_state': ship_package_obj.sender_state,
'sender_postcode': ship_package_obj.sender_postcode,
'sender_country': ship_package_obj.sender_country,
'receiver_name': ship_package_obj.receiver_name,
'receiver_type': ship_package_obj.receiver_type,
'receiver_vat_no': ship_package_obj.receiver_vat_no,
'receiver_add_1': ship_package_obj.receiver_add_1,
'receiver_add_2': ship_package_obj.receiver_add_2,
'receiver_add_3': ship_package_obj.receiver_add_3,
'receiver_city': ship_package_obj.receiver_city,
'receiver_county': ship_package_obj.receiver_county,
'receiver_county_translate': ship_package_obj.receiver_county_translate,
'receiver_postcode': ship_package_obj.receiver_postcode,
'receiver_email': ship_package_obj.receiver_email,
'receiver_phone': ship_package_obj.receiver_phone,
'gross_weight': ship_package_obj.gross_weight,
'weight_unit': ship_package_obj.weight_unit,
'currency': ship_package_obj.currency,
'currency_id': ship_package_obj.currency_id,
'total_value': ship_package_obj.total_value,
'shipping_fee': ship_package_obj.shipping_fee,
'tax_mark': ship_package_obj.tax_mark,
'actual_tax': ship_package_obj.actual_tax,
'actual_vat': ship_package_obj.actual_vat,
'actual_gst': ship_package_obj.actual_gst,
'actual_tax_currency': ship_package_obj.actual_tax_currency,
'actual_currency_id': ship_package_obj.actual_currency_id.id,
'actual_tax_date': ship_package_obj.actual_tax_date,
'actual_tax_tz': ship_package_obj.actual_tax_tz,
'is_cancel': ship_package_obj.is_cancel,
'state': ship_package_obj.state,
'node_exception_reason_id': ship_package_obj.node_exception_reason_id.id,
'process_time': ship_package_obj.process_time,
'cancel_reason': ship_package_obj.cancel_reason,
'exception_info_ids': ship_package_obj.exception_info_ids,
'invoice_attachment_ids': ship_package_obj.invoice_attachment_ids,
# exception_info_ids
# invoice_attachment_ids
# good_ids
# sync_log_ids
'operation_time': ship_package_obj.operation_time,
'state_explain': ship_package_obj.state_explain,
'is_sync': ship_package_obj.is_sync,
'tk_code': ship_package_obj.tk_code,
})
for sync_log_obj in ship_package_obj.sync_log_ids:
self.env['cc.history.package.good'].sudo().create({
'package_id': ship_package_obj.id,
'sync_time': sync_log_obj.sync_time,
'api_customer': sync_log_obj.api_customer,
'process_code': sync_log_obj.process_code,
'operate_time': sync_log_obj.operate_time,
'operate_remark': sync_log_obj.operate_remark,
'operate_user': sync_log_obj.operate_user,
})
for goods_obj in ship_package_obj.goods_ids:
self.env['cc.history.package.good'].sudo().create({
'origin_id': goods_obj.id,
'bl_line_id': ship_package_obj.id,
'big_package_id': create_big_obj.id,
'bl_id': goods_obj.bl_id.id,
'item_id': goods_obj.item_id,
'sku_id': goods_obj.sku_id,
'item_name_cn': goods_obj.item_name_cn,
'item_name_en': goods_obj.item_name_en,
'export_hs_code': goods_obj.export_hs_code,
'import_hs_code': goods_obj.import_hs_code,
'weight': goods_obj.weight,
'quantity': goods_obj.quantity,
'quantity_unit': goods_obj.quantity_unit,
'declare_price': goods_obj.declare_price,
'freight': goods_obj.freight,
'cod_amount': goods_obj.cod_amount,
'vat_rate': goods_obj.vat_rate,
'item_vat': goods_obj.item_vat,
'origin_country': goods_obj.origin_country,
'item_type': goods_obj.item_type,
'item_total_price': goods_obj.item_total_price,
'item_link': goods_obj.item_link,
'item_tax_status': goods_obj.item_tax_status,
'actual_tax': goods_obj.actual_tax,
'actual_tax_rate': goods_obj.actual_tax_rate,
'actual_tax_currency': goods_obj.actual_tax_currency,
'actual_vat': goods_obj.actual_vat,
'actual_vat_rate': goods_obj.actual_vat_rate,
'actual_gst': goods_obj.actual_gst,
'actual_gst_rate': goods_obj.actual_gst_rate,
'currency_id': goods_obj.currency_id.id,
'is_cancel': goods_obj.is_cancel,
})
def cron_update_history_data(self):
history_days = self.env['ir.config_parameter'].sudo().get_param('history_days') or 180
history_limit = self.env['ir.config_parameter'].sudo().get_param('history_limit') or 50
origin_delete = self.env['ir.config_parameter'].sudo().get_param('origin_delete') or 1
redis_conn = self.env['common.common'].sudo().get_redis()
vals = {
'history_days': history_days,
'history_limit': history_limit,
'origin_delete': origin_delete
}
redis_conn.lpush('history_data_list', json.dumps(vals))
is_history = fields.Boolean('历史单据', default=False)
# 提单号
......
......@@ -29,7 +29,7 @@ class CcHistoryShipPackage(models.Model):
bl_id = fields.Many2one('cc.bl', string='Bill of Loading', index=True)
# 大包号
big_package_id = fields.Many2one('cc.big.package', string='Big Package', index=True)
big_package_id = fields.Many2one('cc.history.big.package', string='Big Package', index=True)
# 物流订单号
logistic_order_no = fields.Char(string='Logistic Order No', index=True)
......
......@@ -70,7 +70,7 @@ access_cc_clearance_file_ccs_base.group_clearance_of_customs_user,cc_clearance_f
order_state_change_rule_group_user,order_state_change_rule_group_user,ccs_base.model_order_state_change_rule,base.group_user,1,1,1,1
access_cc_history_big_package_base.group_user,cc_history_big_package base.group_user,ccs_base.model_cc_history_big_package,base.group_user,1,1,1,0
access_cc_history_package_good_base.group_user,cc_history_package_good base.group_user,ccs_base.model_cc_history_package_good,base.group_user,1,1,1,0
access_cc_history_ship_package_base.group_user,cc_history_ship_package base.group_user,ccs_base.model_cc_history_ship_package,base.group_user,1,1,1,0
access_cc_history_package_sync_log_base.group_user,cc_history_package_sync_log base.group_user,ccs_base.model_cc_history_package_sync_log,base.group_user,1,1,1,0
\ No newline at end of file
access_cc_history_big_package_base.group_user,cc_history_big_package base.group_user,ccs_base.model_cc_history_big_package,base.group_user,1,1,1,1
access_cc_history_package_good_base.group_user,cc_history_package_good base.group_user,ccs_base.model_cc_history_package_good,base.group_user,1,1,1,1
access_cc_history_ship_package_base.group_user,cc_history_ship_package base.group_user,ccs_base.model_cc_history_ship_package,base.group_user,1,1,1,1
access_cc_history_package_sync_log_base.group_user,cc_history_package_sync_log base.group_user,ccs_base.model_cc_history_package_sync_log,base.group_user,1,1,1,1
\ No newline at end of file
......@@ -106,7 +106,13 @@
</div>
<widget name="web_ribbon" bg_color="bg-warning" title="Cancelled"
attrs="{'invisible': [('is_cancel', '=', False)]}"/>
<br/>
<br/>
<br/>
<br/>
<widget name="web_ribbon" bg_color="bg-danger" title="历史提单"
attrs="{'invisible': [('is_history', '=', False)]}"/>
<field name="is_history" invisible="1"/>
<label for="bl_no"/>
<h1>
<field name="bl_no" readonly="True"/>
......
......@@ -158,7 +158,7 @@
</field>
</record>
<menuitem parent="" sequence="20" name="History Big Package" id="menu_cc_history_big_package" action="action_cc_history_big_package"/>
<!-- <record id="big_package_add_exception_info_server_action" model="ir.actions.server">-->
......
......@@ -184,6 +184,6 @@
</field>
</record>
<menuitem parent="" sequence="25" name="History Package Goods" id="menu_cc_history_package_good" action="action_cc_history_package_good"/>
</odoo>
\ No newline at end of file
......@@ -333,7 +333,7 @@
</field>
</record>
<menuitem parent="" sequence="23" name="History Ship Package" id="menu_cc_history_ship_package" action="action_cc_history_ship_package"/>
<!-- <record id="ship_package_add_exception_info_server_action" model="ir.actions.server">-->
......
......@@ -51,5 +51,15 @@
<menuitem sequence="20" name="Clearance Company" id="menu_cc_clearance_company"
action="action_cc_is_clearance_company"
parent="menu_ccs_base_main"/>
<menuitem sequence="25" name="History Data" id="menu_cc_history_data"
/>
<menuitem parent="menu_cc_history_data" sequence="1" name="History Big Package" id="menu_cc_history_big_package" action="action_cc_history_big_package"/>
<menuitem parent="menu_cc_history_data" sequence="5" name="History Package Goods" id="menu_cc_history_package_good" action="action_cc_history_package_good"/>
<menuitem parent="menu_cc_history_data" sequence="3" name="History Ship Package" id="menu_cc_history_ship_package" action="action_cc_history_ship_package"/>
</data>
</odoo>
\ No newline at end of file
......@@ -13,6 +13,14 @@ redis_options = dict(
db=0
)
postgresql_options = dict(
host="127.0.0.1",
port=5431,
database="hh_ccs_test",
user="odoo14",
password="qq166349",
)
# 测试
# db_ip = "121.199.167.133"
# db_port = "8369"
......
# coding=utf-8
import json
import logging
import psycopg2
import redis
from sqlalchemy import create_engine
import config
from datetime import timedelta, datetime
import pandas as pd
# from line_profiler import LineProfiler
# 创建一个 LineProfiler 实例
# profiler = LineProfiler()
# 默认字符gbk
logging.basicConfig(filename='logs/history_data_logger.log', level=logging.INFO)
# 设置文件字符为utf-8
# logging.basicConfig(handlers=[logging.FileHandler('logs/mail_push.log', 'a', 'utf-8')],
# format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
class Order_dispose(object):
def __init__(self):
print('new connection')
postgresql_options = config.postgresql_options
username = postgresql_options['user']
password = postgresql_options['password']
host = postgresql_options['host']
port = postgresql_options['port']
database = postgresql_options['database']
db_url = 'postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}'.format(
username=username, password=password, host=host, port=port,
database=database)
self.conn_engine = create_engine(db_url)
self.pg_conn = psycopg2.connect(database=database, user=username,
password=password, host=host, port=port)
def get_init_data(self, days):
pg_conn = psycopg2.connect(**config.postgresql_options)
cursor = pg_conn.cursor()
db_handle = self.conn_engine
current_date = datetime.today()
# 计算180天之前的日期
past_date = current_date - timedelta(days=days)
return pg_conn, cursor, db_handle, past_date
def history_big_package(self, db_handle, big_package_data):
"""
创建历史大包数据
:param db_handle:
:param big_package_data: 原大包数据
:return: 历史大包id
"""
# 创建历史大包数据
big_package_vals = {
'origin_id': big_package_data.id,
'bl_id': big_package_data.bl_id,
'big_package_no': big_package_data.big_package_no,
'next_provider_name': big_package_data.next_provider_name,
'ship_package_qty': big_package_data.ship_package_qty,
'goods_qty': big_package_data.goods_qty,
'pallet_number': big_package_data.pallet_number,
'pallet_usage_date': big_package_data.pallet_usage_date,
'is_cancel': big_package_data.is_cancel,
'tally_state': big_package_data.tally_state,
'tally_user_id': big_package_data.tally_user_id,
'tally_time': big_package_data.tally_time,
'delivery_user_id': big_package_data.delivery_user_id,
'delivery_time': big_package_data.delivery_time,
# 'exception_info_ids': big_package_data.id,
# exception_info_ids
# goods_ids
# ship_package_ids
}
val_df = pd.DataFrame(big_package_vals, index=[0])
val_df.to_sql('cc_history_big_package', con=db_handle, if_exists='append', index=False)
sql = "select id from cc_history_big_package where origin_id=%s;" % big_package_data.id
new_order = pd.read_sql(sql, con=db_handle)
# logging.info("new_order:%s" % len(new_order))
history_big_package_id = new_order.to_dict()['id'][0]
insert_sql = 'insert into history_bigpackage_exception_info_rel (cc_history_big_package_id, cc_exception_info_id) ' \
'select %s, cc_exception_info_id from bigpackage_exception_info_rel where ' \
'cc_big_package_id=%s;' % (history_big_package_id, big_package_data.id)
pd.read_sql(insert_sql, con=db_handle, chunksize=100)
return history_big_package_id
def history_ship_package(self, db_handle, big_id, ship_package_data):
"""
创建历史小包数据
:param db_handle:
:param ship_package_data: 原小包数据
:param big_id: 历史大包id
:return: 历史小包id
"""
# 创建历史小包数据
history_ship_package_vals = {
'customer_id': ship_package_data.customer_id,
'origin_id': ship_package_data.id,
'bl_id': ship_package_data.bl_id,
'big_package_id': big_id,
'logistic_order_no': ship_package_data.logistic_order_no,
'tracking_no': ship_package_data.tracking_no,
'customer_ref': ship_package_data.customer_ref,
'internal_account_number': ship_package_data.internal_account_number,
'user_track_note': ship_package_data.user_track_note,
'company_code': ship_package_data.company_code,
'trade_no': ship_package_data.trade_no,
'big_package_no': ship_package_data.big_package_no,
'container_no': ship_package_data.container_no,
'buyer_region': ship_package_data.buyer_region,
'next_provider_name': ship_package_data.next_provider_name,
'sender_name': ship_package_data.sender_name,
'sender_vat_no': ship_package_data.sender_vat_no,
'sender_phone': ship_package_data.sender_phone,
'sender_add_1': ship_package_data.sender_add_1,
'sender_add_2': ship_package_data.sender_add_2,
'sender_add_3': ship_package_data.sender_add_3,
'sender_city': ship_package_data.sender_city,
'sender_state': ship_package_data.sender_state,
'sender_postcode': ship_package_data.sender_postcode,
'sender_country': ship_package_data.sender_country,
'receiver_name': ship_package_data.receiver_name,
'receiver_type': ship_package_data.receiver_type,
'receiver_vat_no': ship_package_data.receiver_vat_no,
'receiver_add_1': ship_package_data.receiver_add_1,
'receiver_add_2': ship_package_data.receiver_add_2,
'receiver_add_3': ship_package_data.receiver_add_3,
'receiver_city': ship_package_data.receiver_city,
'receiver_county': ship_package_data.receiver_county,
'receiver_county_translate': ship_package_data.receiver_county_translate,
'receiver_postcode': ship_package_data.receiver_postcode,
'receiver_email': ship_package_data.receiver_email,
'receiver_phone': ship_package_data.receiver_phone,
'gross_weight': ship_package_data.gross_weight,
'weight_unit': ship_package_data.weight_unit,
'currency': ship_package_data.currency,
'currency_id': ship_package_data.currency_id,
'total_value': ship_package_data.total_value,
'shipping_fee': ship_package_data.shipping_fee,
'tax_mark': ship_package_data.tax_mark,
'actual_tax': ship_package_data.actual_tax,
'actual_vat': ship_package_data.actual_vat,
'actual_gst': ship_package_data.actual_gst,
'actual_tax_currency': ship_package_data.actual_tax_currency,
'actual_currency_id': ship_package_data.actual_currency_id,
'actual_tax_date': ship_package_data.actual_tax_date,
'actual_tax_tz': ship_package_data.actual_tax_tz,
'is_cancel': ship_package_data.is_cancel,
'state': ship_package_data.state,
'node_exception_reason_id': ship_package_data.node_exception_reason_id,
'process_time': ship_package_data.process_time,
'cancel_reason': ship_package_data.cancel_reason,
# 'exception_info_ids': ship_package_obj.exception_info_ids,ship_package_data.id
# 'invoice_attachment_ids': ship_package_obj.invoice_attachment_ids,ship_package_data.id
# exception_info_ids
# invoice_attachment_ids
# good_ids
# sync_log_ids
'operation_time': ship_package_data.operation_time,
'state_explain': ship_package_data.state_explain,
'is_sync': ship_package_data.is_sync,
'tk_code': ship_package_data.tk_code,
}
val_df = pd.DataFrame(history_ship_package_vals, index=[0])
val_df.to_sql('cc_history_ship_package', con=db_handle, if_exists='append', index=False)
sql = "select id from cc_history_ship_package where origin_id=%s;" % ship_package_data.id
new_order = pd.read_sql(sql, con=db_handle)
# logging.info("new_order:%s" % len(new_order))
history_ship_package_id = new_order.to_dict()['id'][0]
insert_sql = 'insert into history_package_exception_info_rel (cc_history_ship_package_id, cc_exception_info_id) ' \
'select %s, cc_exception_info_id from shippackage_exception_info_rel where ' \
'cc_ship_package_id=%s;' % (history_ship_package_id, ship_package_data.id)
pd.read_sql(insert_sql, con=db_handle, chunksize=100)
insert_sql = 'insert into history_package_invoice_attachment_rel (cc_history_ship_package_id, ir_attachment_id) ' \
'select %s, ir_attachment_id from ship_package_invoice_attachment_rel where ' \
'cc_ship_package_id=%s;' % (history_ship_package_id, ship_package_data.id)
pd.read_sql(insert_sql, con=db_handle, chunksize=100)
return history_ship_package_id
def history_sync_log(self, db_handle, history_ship_package_id, ship_package_id):
"""
创建历史小包日志
:param db_handle:
:param history_ship_package_id: 历史小包id
:param ship_package_id: 原来小包id
:return:
"""
sync_sql = "SELECT id, sync_time, api_customer, process_code, operate_time, operate_remark, operate_user FROM cc_ship_package_sync_log WHERE package_id = %s;"
sync_log_result_arr = pd.read_sql(sync_sql, con=db_handle, params=(ship_package_id,))
sync_log_vals_list = []
for sync_log_result in sync_log_result_arr.itertuples():
sync_log_data = sync_log_result
# 创建历史小包日志数据
sync_log_vals = {
'package_id': history_ship_package_id,
'sync_time': sync_log_data.sync_time,
'api_customer': sync_log_data.api_customer,
'process_code': sync_log_data.process_code,
'operate_time': sync_log_data.operate_time,
'operate_remark': sync_log_data.operate_remark,
'operate_user': sync_log_data.operate_user,
}
sync_log_vals_list.append(sync_log_vals)
if sync_log_vals_list:
val_df = pd.DataFrame(sync_log_vals_list)
val_df.to_sql('cc_history_package_sync_log', con=db_handle, if_exists='append', index=False)
origin_log_ids = [sync_log_result.id for sync_log_result in sync_log_result_arr.itertuples()]
return origin_log_ids
def history_package_goods(self, db_handle, ship_package_id, history_ship_package_id, big_id, select_bl_id):
"""
创建历史小包商品
:param db_handle:
:param ship_package_id: 原小包id
:param history_ship_package_id: 历史小包id
:param big_id: 历史大包id
:param select_bl_id: 原提单id
:return:
"""
# 商品
package_good_sql = """select id, bl_line_id, big_package_id, bl_id, item_id, sku_id,
item_name_cn, item_name_en, export_hs_code, import_hs_code, weight,
quantity, quantity_unit, declare_price, freight, cod_amount, vat_rate,
item_vat, origin_country, item_type, item_total_price, item_link,
item_tax_status, actual_tax, actual_tax_rate, actual_tax_currency,
actual_vat, actual_vat_rate, actual_gst, actual_gst_rate, currency_id,
is_cancel from cc_package_good where bl_line_id=%s;""" % ship_package_id
package_good_result_arr = pd.read_sql(package_good_sql, con=db_handle)
# origin_goods_ids = []
package_good_vals_list = []
for package_good_data in package_good_result_arr.itertuples():
# 创建历史小包商品数据
package_good_vals = {
'origin_id': package_good_data.id,
'bl_line_id': history_ship_package_id,
'big_package_id': big_id,
'bl_id': select_bl_id,
'item_id': package_good_data.item_id,
'sku_id': package_good_data.sku_id,
'item_name_cn': package_good_data.item_name_cn,
'item_name_en': package_good_data.item_name_en,
'export_hs_code': package_good_data.export_hs_code,
'import_hs_code': package_good_data.import_hs_code,
'weight': package_good_data.weight,
'quantity': package_good_data.quantity,
'quantity_unit': package_good_data.quantity_unit,
'declare_price': package_good_data.declare_price,
'freight': package_good_data.freight,
'cod_amount': package_good_data.cod_amount,
'vat_rate': package_good_data.vat_rate,
'item_vat': package_good_data.item_vat,
'origin_country': package_good_data.origin_country,
'item_type': package_good_data.item_type,
'item_total_price': package_good_data.item_total_price,
'item_link': package_good_data.item_link,
'item_tax_status': package_good_data.item_tax_status,
'actual_tax': package_good_data.actual_tax,
'actual_tax_rate': package_good_data.actual_tax_rate,
'actual_tax_currency': package_good_data.actual_tax_currency,
'actual_vat': package_good_data.actual_vat,
'actual_vat_rate': package_good_data.actual_vat_rate,
'actual_gst': package_good_data.actual_gst,
'actual_gst_rate': package_good_data.actual_gst_rate,
'currency_id': package_good_data.currency_id,
'is_cancel': package_good_data.is_cancel,
}
package_good_vals_list.append(package_good_vals)
if package_good_vals_list:
val_df = pd.DataFrame(package_good_vals_list, index=[0])
val_df.to_sql('cc_history_package_good', con=db_handle, if_exists='append', index=False)
origin_goods_ids = package_good_result_arr['id'].tolist()
return origin_goods_ids
def delete_origin_data(self):
pass
# @profiler
def order_data(self, data):
res_data = []
try:
data = json.loads(data)
days = int(data.get('history_days', 180)) # 默认180天之前的数据
origin_delete = int(data.get('origin_delete', 1)) # 默认删除原数据
limit_num = int(data.get('history_limit', 50)) # 默认一次同步50个提单
pg_conn, cursor, db_handle, past_date = self.get_init_data(days)
# 获取历史提单
sql = 'select id,bl_no from cc_bl where (is_history=False or is_history is null) and create_date < %s order by create_date asc limit %s;'
result_arr = pd.read_sql(sql, con=db_handle, params=(past_date, int(limit_num)))
# sql = 'select id,bl_no from cc_bl where id = 14;'
# result_arr = pd.read_sql(sql, con=db_handle) # , params=(14,)
# 获取查询结果
# delete_order_ids = []
for res in result_arr.itertuples():
select_bl_id = res.id # 原提单id
select_bl_no = res.bl_no
delete_big_ids = []
delete_package_ids = []
delete_good_ids = []
delete_log_ids = []
# 查提单下的所有大包
big_sql = 'select id,bl_id,big_package_no,next_provider_name,ship_package_qty,goods_qty,' \
'pallet_number,pallet_usage_date,is_cancel,tally_state,tally_user_id,tally_time,' \
'delivery_user_id,delivery_time from cc_big_package where bl_id = %s;'
big_package_result_arr = pd.read_sql(big_sql, con=db_handle, params=(select_bl_id,))
for big_package_data in big_package_result_arr.itertuples():
origin_id = big_package_data.id
# 处理历史大包
history_big_id = self.history_big_package(db_handle, big_package_data) # 返回历史大包id
delete_big_ids.append(origin_id) # 添加删除大包
# 原大包中的小包
parcel_sql = "select * from cc_ship_package where big_package_id=%s;" % origin_id
ship_package_result_arr = pd.read_sql(parcel_sql, con=db_handle)
# 原小包数据
for ship_package_data in ship_package_result_arr.itertuples():
# 处理历史小包
history_ship_package_id = self.history_ship_package(db_handle, history_big_id, ship_package_data)
delete_package_ids.append(ship_package_data.id) # 添加删除小包
# 处理历史小包推送日志
origin_log_ids = self.history_sync_log(db_handle, history_ship_package_id, ship_package_data.id)
delete_log_ids += origin_log_ids
# 处理历史小包商品
origin_goods_ids = self.history_package_goods(db_handle, ship_package_data.id, history_ship_package_id, history_big_id,
select_bl_id)
delete_good_ids += origin_goods_ids
update_sql = 'update cc_bl set is_history=True where id=%s' % select_bl_id
pd.read_sql(update_sql, con=db_handle, chunksize=100)
if origin_delete:
# print('delete data')
# if delete_big_ids: # 删除原大包
# ids = '(%s)' % str(delete_big_ids)[1:-1]
# delete_big_sql = 'delete from cc_big_package where id in %s' % ids
# pd.read_sql(delete_big_sql, con=db_handle, chunksize=100)
# if delete_package_ids: # 删除原小包
# ids = '(%s)' % str(delete_package_ids)[1:-1]
# delete_package_sql = 'delete from cc_ship_package where id in %s' % ids
# pd.read_sql(delete_package_sql, con=db_handle, chunksize=100)
# if delete_good_ids: # 删除原小包商品
# ids = '(%s)' % str(delete_good_ids)[1:-1]
# delete_goods_sql = 'delete from cc_package_good where id in %s' % ids
# pd.read_sql(delete_goods_sql, con=db_handle, chunksize=100)
# if delete_log_ids: # 删除原小包tiktok日志
# ids = '(%s)' % str(delete_log_ids)[1:-1]
# delete_log_sql = 'delete from cc_ship_package_sync_log where id in %s' % ids
# pd.read_sql(delete_log_sql, con=db_handle, chunksize=100)
logging.info(f'同步提单:{select_bl_no},删除大包{len(delete_big_ids)}个,删除小包{len(delete_package_ids)}个,删除小包商品{len(delete_good_ids)}个,删除小包日志{len(delete_log_ids)}条')
except Exception as ex:
logging.error('create_history_data error:%s' % str(ex))
# profiler.print_stats(output_unit=1)
return res_data
try:
pool = redis.ConnectionPool(**config.redis_options)
r = redis.Redis(connection_pool=pool)
logging.info(u'redis连接成功')
Order_dispose = Order_dispose()
while 1:
try:
result = r.brpop('history_data_list', 0)
# print(result[1])
data1 = result[1]
response_data = Order_dispose.order_data(data1)
except Exception as e:
logging.error(e)
continue
except Exception as e:
logging.error("登录失败:%s" % e)
[program:history_data_consumer_1]
process_name=%(program_name)s_%(process_num)02d ; 进程名称
directory = /mnt/extra-addons ; 程序的启动目录
command = /usr/bin/python3 /mnt/extra-addons/history_data.py ; 启动命令
autostart = true ; 在 supervisord 启动的时候也自动启动
startsecs = 5 ; 启动 5 秒后没有异常退出,就当作已经正常启动了
autorestart = true ; 程序异常退出后自动重启
startretries = 3 ; 启动失败自动重试次数,默认是 3
user = root ; 用哪个用户启动
numprocs=1 ; 进程数
redirect_stderr = true ; 把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes = 20MB ; stdout 日志文件大小,默认 50MB
stdout_logfile_backups = 20 ; stdout 日志文件备份数
; stdout 日志文件,需要注意当指定目录不存在时无法正常启动,所以需要手动创建目录(supervisord 会自动创建日志文件)
stdout_logfile = /var/log/supervisor/history_data.log
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论