提交 bf4990f4 authored 作者: 贺阳's avatar 贺阳

1、增加查询一键全扫信息接口、按尾程理货、按尾程交接的接口

上级 ec5b3601
......@@ -11,15 +11,32 @@ class CCLastMileProvider(models.Model):
for record in self:
if record.matching_value:
values = record.matching_value.split('\n')
existing_values = '\n'.join(self.search([('id', '!=', record.id)]).mapped('matching_value')).split('\n')
existing_values = '\n'.join(self.search(
[('id', '!=', record.id)]).mapped('matching_value')).split('\n')
if len(values) != len(set(values)) or any(value in existing_values for value in values):
raise ValidationError(_("Matching values must be unique!"))
@api.constrains('placement_area')
def _check_placement_area_unique(self):
for record in self:
if record.placement_area:
placement_area = self.env['common.common'].process_match_str(
record.placement_area)
# 检查所有记录的处理后值是否有重复
pro_map = self.get_all_placement_area(
[('id', '!=', record.id), ('placement_area', '!=', False)])
pro_id = pro_map.get(placement_area)
if pro_id:
raise ValidationError(
_("Placement Area must be unique (ignoring spaces, slashes, dashes, and case)!"))
logo = fields.Binary('Courier Logo') # 快递logo,英文
name = fields.Char(string='Courier Name', required=True) # 快递名称
abbreviation = fields.Char(string='Abbreviation', required=True) # 简称
tape_color_value = fields.Char(string='Tape Color Value') # 胶带色值
active = fields.Boolean('Active', default=True) # 有效☑️
matching_value = fields.Text(string='Matching Value') # 尾程服务商匹配值
placement_area = fields.Char('Placement Area') # 摆放区域,英文
def match_provider(self, provider_name):
"""Check if the provider name exists in matching values and return the record."""
......@@ -30,3 +47,22 @@ class CCLastMileProvider(models.Model):
if provider_name in record.matching_value.split('\n'):
return record # 返回找到的记录
return False # 如果没有找到,返回 None
def get_all_placement_area(self, domain=[]):
"""
获取所有摆放区域,英文
"""
all_providers = self.sudo().search(domain or [])
provider_map = {
self.env['common.common'].process_match_str(provider.placement_area): provider.id
for provider in all_providers
}
return provider_map
def search_pro_info(self):
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url')
return {
'provider_name': self.name, # 尾程快递名称
'placement_area': self.placement_area or '', # 摆放区域
'logo': "%s/web/image/%s/%s/logo" % (base_url, self._name, self.id) if self.logo else '', # 快递logo
}
......@@ -16,6 +16,16 @@ class CommonCommon(models.Model):
_name = 'common.common'
_description = u'公用基础类'
# 去杠去空格转大写
def process_match_str(self, input_str):
"""
处理匹配的字符串,去除杠和空格,并转换为大写
"""
if input_str:
return input_str.replace('-', '').replace('/', '').replace(' ', '').upper()
return input_str
def get_local_time(self, local_time=None, user_obj=False):
"""获取Odoo时区的时间
Args:
......@@ -30,8 +40,10 @@ class CommonCommon(models.Model):
if not user_obj:
user_obj = self.env.user
user_tz = user_obj.tz or 'UTC'
timezone_offset = self.env['common.common'].sudo().get_time_zone(user_tz)
local_time = local_time + datetime.timedelta(hours=int(timezone_offset))
timezone_offset = self.env['common.common'].sudo(
).get_time_zone(user_tz)
local_time = local_time + \
datetime.timedelta(hours=int(timezone_offset))
return local_time.strftime('%Y-%m-%d %H:%M:%S'), timezone_offset
except Exception as e:
# 如果出现任何错误,返回UTC时间
......@@ -51,8 +63,10 @@ class CommonCommon(models.Model):
if not user_obj:
user_obj = self.env.user
user_tz = user_obj.tz or 'UTC'
timezone_offset = self.env['common.common'].sudo().get_time_zone(user_tz)
local_time = local_time + datetime.timedelta(hours=int(timezone_offset))
timezone_offset = self.env['common.common'].sudo(
).get_time_zone(user_tz)
local_time = local_time + \
datetime.timedelta(hours=int(timezone_offset))
local_tz = pytz.timezone(user_tz)
# 确保时间是本地时区
if local_time.tzinfo is None:
......
......@@ -12,7 +12,10 @@ class ResConfigSettings(models.TransientModel):
_inherit = 'res.config.settings'
before_min = fields.Integer('清关时间取值(早于清关结束)')
package_scan_min = fields.Integer('一键全扫完成时间(min)', help='输入示范:10,即表示在10分钟内大包时间随机分配,并不能重复')
package_scan_min = fields.Integer(
'一键全扫完成时间(min)', help='输入示范:10,即表示在10分钟内大包时间随机分配,并不能重复')
is_package_scan = fields.Boolean(
'一键全扫开关', default=False, config_parameter='is_package_scan')
@api.model
def get_values(self):
......@@ -24,10 +27,9 @@ class ResConfigSettings(models.TransientModel):
config = self.env['ir.config_parameter'].sudo()
before_min = config.get_param('before_min', default=10)
package_scan_min = config.get_param('package_scan_min', default=10)
values.update(
before_min=before_min,
package_scan_min=package_scan_min,
package_scan_min=package_scan_min
)
return values
......
......@@ -7,11 +7,13 @@
<sheet>
<group>
<group>
<field name="logo" widget="image" class="oe_avatar"/>
<field name="name"/> <!-- 快递名称 -->
<field name="abbreviation"/> <!-- 简称 -->
<field name="tape_color_value" widget="color" required="1"/> <!-- 胶带色值 -->
<field name="matching_value"
placeholder="Multiple entries can be made, one matching value per line"/> <!-- 尾程服务商匹配值 -->
<field name="placement_area"/>
</group>
<group>
<field name="active"/> <!-- 有效☑️ -->
......@@ -31,6 +33,7 @@
<field name="abbreviation"/> <!-- 简称 -->
<field name="tape_color_value"/> <!-- 胶带色值 -->
<field name="matching_value"/> <!-- 尾程服务商匹配值 -->
<field name="placement_area"/> <!-- 摆放区域 -->
<field name="active"/> <!-- 有效☑️ -->
</tree>
</field>
......@@ -43,6 +46,7 @@
<search string="Last Mile Provider">
<field name="name"/>
<field name="abbreviation"/>
<field name="placement_area"/>
</search>
</field>
</record>
......
......@@ -27,6 +27,11 @@
<div class="col-12 col-lg-6 o_setting_box">
<div class="o_setting_left_pane"/>
<div class="o_setting_right_pane">
<div class="text-muted">
<label for="is_package_scan"/>
<field name="is_package_scan"/>
</div>
<div class="text-muted">
<label for="package_scan_min"/>
<field name="package_scan_min"/>
......
from . import tt_controllers
from . import order_controller
from . import binary
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import logging
from odoo.addons.web.controllers.binary import Binary
try:
from werkzeug.utils import send_file
except ImportError:
from odoo.tools._vendor.send_file import send_file
from odoo import http
from odoo.exceptions import UserError
from odoo.http import request
from odoo.tools import replace_exceptions, str2bool
from odoo.tools.image import image_guess_size_from_field_name
_logger = logging.getLogger(__name__)
BAD_X_SENDFILE_ERROR = """\
Odoo is running with --x-sendfile but is receiving /web/filestore requests.
With --x-sendfile enabled, NGINX should be serving the
/web/filestore route, however Odoo is receiving the
request.
This usually indicates that NGINX is badly configured,
please make sure the /web/filestore location block exists
in your configuration file and that it is similar to:
location /web/filestore {{
internal;
alias {data_dir}/filestore;
}}
"""
def clean(name):
return name.replace('\x3c', '')
class AttachmentBinary(Binary):
@http.route(['/web/image',
'/web/image/<string:xmlid>',
'/web/image/<string:xmlid>/<string:filename>',
'/web/image/<string:xmlid>/<int:width>x<int:height>',
'/web/image/<string:xmlid>/<int:width>x<int:height>/<string:filename>',
'/web/image/<string:model>/<int:id>/<string:field>',
'/web/image/<string:model>/<int:id>/<string:field>/<string:filename>',
'/web/image/<string:model>/<int:id>/<string:field>/<int:width>x<int:height>',
'/web/image/<string:model>/<int:id>/<string:field>/<int:width>x<int:height>/<string:filename>',
'/web/image/<int:id>',
'/web/image/<int:id>/<string:filename>',
'/web/image/<int:id>/<int:width>x<int:height>',
'/web/image/<int:id>/<int:width>x<int:height>/<string:filename>',
'/web/image/<int:id>-<string:unique>',
'/web/image/<int:id>-<string:unique>/<string:filename>',
'/web/image/<int:id>-<string:unique>/<int:width>x<int:height>',
'/web/image/<int:id>-<string:unique>/<int:width>x<int:height>/<string:filename>'], type='http',
auth="public", cors="*")
# pylint: disable=redefined-builtin,invalid-name
def content_image(self, xmlid=None, model='ir.attachment', id=None, field='raw',
filename_field='name', filename=None, mimetype=None, unique=False,
download=False, width=0, height=0, crop=False, access_token=None,
nocache=False):
try:
record = request.env['ir.binary'].sudo()._find_record(xmlid, model, id and int(id), access_token)
stream = request.env['ir.binary'].sudo()._get_image_stream_from(
record, field, filename=filename, filename_field=filename_field,
mimetype=mimetype, width=int(width), height=int(height), crop=crop,
)
except UserError as exc:
if download:
raise request.not_found() from exc
# Use the ratio of the requested field_name instead of "raw"
if (int(width), int(height)) == (0, 0):
width, height = image_guess_size_from_field_name(field)
record = request.env.ref('web.image_placeholder').sudo()
stream = request.env['ir.binary']._get_image_stream_from(
record, 'raw', width=int(width), height=int(height), crop=crop,
)
send_file_kwargs = {'as_attachment': download}
if unique:
send_file_kwargs['immutable'] = True
send_file_kwargs['max_age'] = http.STATIC_CACHE_LONG
if nocache:
send_file_kwargs['max_age'] = None
return stream.get_response(**send_file_kwargs)
@http.route(['/web/content',
'/web/content/<string:xmlid>',
'/web/content/<string:xmlid>/<string:filename>',
'/web/content/<int:id>',
'/web/content/<int:id>/<string:filename>',
'/web/content/<string:model>/<int:id>/<string:field>',
'/web/content/<string:model>/<int:id>/<string:field>/<string:filename>'], type='http', auth="public",
cors="*")
# pylint: disable=redefined-builtin,invalid-name
def content_common(self, xmlid=None, model='ir.attachment', id=None, field='raw',
filename=None, filename_field='name', mimetype=None, unique=False,
download=False, access_token=None, nocache=False):
with replace_exceptions(UserError, by=request.not_found()):
record = request.env['ir.binary'].sudo()._find_record(xmlid, model, id and int(id), access_token)
stream = request.env['ir.binary'].sudo()._get_stream_from(record, field, filename, filename_field, mimetype)
if request.httprequest.args.get('access_token'):
stream.public = True
send_file_kwargs = {'as_attachment': str2bool(download)}
if unique:
send_file_kwargs['immutable'] = True
send_file_kwargs['max_age'] = http.STATIC_CACHE_LONG
if nocache:
send_file_kwargs['max_age'] = None
return stream.get_response(**send_file_kwargs)
......@@ -49,7 +49,8 @@ class OrderController(http.Controller):
# system_user = system_user.with_context(lang=lang)
try:
if kwargs.get('login') and kwargs.get('password'):
uid = request.session.authenticate(request.session.db, kwargs['login'], kwargs['password'])
uid = request.session.authenticate(
request.session.db, kwargs['login'], kwargs['password'])
user = request.env['res.users'].sudo().browse(uid)
res['user_info'] = {
'id': uid, # 用户id
......@@ -58,16 +59,33 @@ class OrderController(http.Controller):
res['state'] = 200
else:
res['state'] = 202
res['message'] = error_msg_dic[pda_lang] # _('Login name and password cannot be empty') # 登录名、密码不能为空
# _('Login name and password cannot be empty') # 登录名、密码不能为空
res['message'] = error_msg_dic[pda_lang]
except exceptions.AccessDenied as e:
if e.args == exceptions.AccessDenied().args:
res['message'] = exceptions_msg_dic[pda_lang] # _("Wrong login/password") # 错误的登录名或密码
# _("Wrong login/password") # 错误的登录名或密码
res['message'] = exceptions_msg_dic[pda_lang]
else:
res['message'] = e.args[0]
logging.info('api_cc_login error:%s' % res)
return res
@http.route('/api/one_key_full_scan/info', type='json', auth='public', csrf=False)
def one_key_full_scan_info(self):
"""
查询一键全扫信息接口,返回是否一键全扫和完成时间
入参:bl_no
"""
# kwargs = json.loads(request.httprequest.data)
res = {'state': 201, 'message': ''}
res['package_scan_min'] = int(
request.env['res.config.settings'].sudo().get_values().get('package_scan_min'))
res['is_package_scan'] = bool(request.env['ir.config_parameter'].sudo(
).get_param('is_package_scan'))
res['state'] = 200
return res
@http.route('/api/bl/info', type='json', auth='public', methods=['GET', 'POST'], csrf=False)
def bl_info(self):
"""
......@@ -85,17 +103,22 @@ class OrderController(http.Controller):
bl_no = kwargs['bl_no']
# bl_obj = request.env['cc.bl'].sudo().search([('bl_no', '=', bl_no)]) # 提单
state_arr = ['draft', 'ccing']
bl_obj = request.env['cc.bl'].sudo().deal_bl_no(bl_no) # 提单号去掉杠和空格,并转换为小写
bl_obj = request.env['cc.bl'].sudo().deal_bl_no(
bl_no) # 提单号去掉杠和空格,并转换为小写
if bl_obj:
if bl_obj.state in state_arr:
res['bl_info'] = bl_obj.search_bl_info(pda_lang=pda_lang, type=action_type)
res['package_scan_min'] = int(request.env['res.config.settings'].sudo().get_values().get('package_scan_min'))
res['bl_info'] = bl_obj.search_bl_info(
pda_lang=pda_lang, type=action_type)
res['package_scan_min'] = int(
request.env['res.config.settings'].sudo().get_values().get('package_scan_min'))
res['state'] = 200
else:
res['message'] = bill_state_msg_dic[pda_lang] # 没有在系统中找到未完成清关的该提单信息
# 没有在系统中找到未完成清关的该提单信息
res['message'] = bill_state_msg_dic[pda_lang]
else:
res['state'] = 404
res['message'] = bill_noexist_msg_dic[pda_lang] # _('Bill of lading does not exist') # 提单不存在
# _('Bill of lading does not exist') # 提单不存在
res['message'] = bill_noexist_msg_dic[pda_lang]
else:
res['state'] = 202
res['message'] = bill_null_msg_dic[
......@@ -115,7 +138,6 @@ class OrderController(http.Controller):
def update_big_package_tally_detail(self):
"""
修改理货信息
:param kwargs:
:return:
"""
kwargs = json.loads(request.httprequest.data)
......@@ -128,7 +150,8 @@ class OrderController(http.Controller):
kwargs.get('big_package_arr') or kwargs.get('ship_package_arr') or kwargs.get('pallet_arr')):
bl_no = kwargs['bl_no']
state_arr = ['draft', 'ccing']
bl_obj = request.env['cc.bl'].sudo().deal_bl_no(bl_no) # 提单号去掉杠和空格,并转换为小写
bl_obj = request.env['cc.bl'].sudo().deal_bl_no(
bl_no) # 提单号去掉杠和空格,并转换为小写
if bl_obj and bl_obj.state in state_arr:
ship_packages = []
big_package_exception_arr = {}
......@@ -141,7 +164,8 @@ class OrderController(http.Controller):
file_str = 'big_package_no' if package_type == 'big' else (
'logistic_order_no' if package_type == 'ship' else 'pallet_number') # 大包号/物流订单号
package_no = package_item.get(file_str) # 包裹号
exception_cause_ids = package_item.get('exception_cause_ids') # 异常原因id数组
exception_cause_ids = package_item.get(
'exception_cause_ids') # 异常原因id数组
if package_type == 'pallet':
package_obj = request.env['cc.big.package'].sudo().search(
[('pallet_number', '=', package_no), ('bl_id', '=', bl_obj.id)]) # 多个
......@@ -153,13 +177,16 @@ class OrderController(http.Controller):
for excep_item in exception_cause_ids:
if package_type == 'ship':
if excep_item not in ship_package_exception_arr:
ship_package_exception_arr[excep_item] = []
ship_package_exception_arr[excep_item] = [
]
ship_package_exception_arr[excep_item] += package_obj.ids
else:
if excep_item not in big_package_exception_arr:
big_package_exception_arr[excep_item] = []
big_package_exception_arr[excep_item] = [
]
big_package_exception_arr[excep_item] += package_obj.ids
package_obj.update_exception_info(exception_cause_ids) # 修改异常信息
package_obj.update_exception_info(
exception_cause_ids) # 修改异常信息
tally_time = package_item.get('tally_time')
if (action_type == 'tally' and package_item.get('tally_state') == 'checked_goods') or (
action_type == 'handover' and package_item.get(
......@@ -179,7 +206,8 @@ class OrderController(http.Controller):
'tally_time': tally_time
}) # 小包
package_obj.update_big_package_info(action_type=action_type,
tally_state=package_item.get('tally_state'),
tally_state=package_item.get(
'tally_state'),
tally_user_id=package_item.get(
'tally_user_id'),
tally_time=tally_time) # 修改理货信息
......@@ -189,7 +217,8 @@ class OrderController(http.Controller):
# 处理小包、大包和托盘
if kwargs.get('ship_package_arr'):
error_no_arr = process_packages(kwargs['ship_package_arr'], 'ship', ship_packages)
error_no_arr = process_packages(
kwargs['ship_package_arr'], 'ship', ship_packages)
if error_no_arr:
res['message'] = {
'en': 'Ship package number [%s] does not exist' % ','.join(error_no_arr),
......@@ -198,7 +227,8 @@ class OrderController(http.Controller):
return res
if kwargs.get('big_package_arr'):
error_no_arr = process_packages(kwargs['big_package_arr'], 'big', ship_packages)
error_no_arr = process_packages(
kwargs['big_package_arr'], 'big', ship_packages)
if error_no_arr:
res['message'] = {
'en': 'Big package number [%s] does not exist' % ','.join(error_no_arr),
......@@ -207,7 +237,8 @@ class OrderController(http.Controller):
return res
if kwargs.get('pallet_arr'):
error_no_arr = process_packages(kwargs['pallet_arr'], 'pallet', ship_packages)
error_no_arr = process_packages(
kwargs['pallet_arr'], 'pallet', ship_packages)
if error_no_arr:
res['message'] = {
'en': 'Tray number [%s] does not have a corresponding big package' % ','.join(
......@@ -238,10 +269,12 @@ class OrderController(http.Controller):
email_language=lang)
ship_wizard_obj.confirm() # 发送邮件
res['state'] = 200
logging.info('update_big_package_tally_detail ship_packages:%s' % len(ship_packages))
logging.info(
'update_big_package_tally_detail ship_packages:%s' % len(ship_packages))
# 有小包 就更新小包状态和同步
if ship_packages:
redis_conn = request.env['common.common'].sudo().get_redis()
redis_conn = request.env['common.common'].sudo(
).get_redis()
if redis_conn and redis_conn != 'no':
# redis_conn.lpush('push_ship_package_state', json.dumps(
# {'bl_id': bl_obj.id, 'ship_package_ids': ship_package_ids}))
......@@ -277,8 +310,10 @@ class OrderController(http.Controller):
try:
logging.info('exceptions_info kwargs:%s' % kwargs)
lang = 'zh_CN' if pda_lang == 'zh' else 'en_US' # 语言
exception_obj = request.env['cc.exception.info'].sudo().with_context({'lang': lang}).search([]) # 包裹异常原因
res['exception_arr'] = [excep.search_exception_info(pda_lang=pda_lang) for excep in exception_obj]
exception_obj = request.env['cc.exception.info'].sudo(
).with_context({'lang': lang}).search([]) # 包裹异常原因
res['exception_arr'] = [excep.search_exception_info(
pda_lang=pda_lang) for excep in exception_obj]
res['state'] = 200
except Exception as e:
exceptions_msg_dic = {
......@@ -290,3 +325,49 @@ class OrderController(http.Controller):
pda_lang] # _('System parsing error, the reason for the error is %s', e) # 系统解析错误,错误原因是
logging.info('exceptions_info res:%s' % res)
return res
@http.route('/api/last_mile/tally', type='json', auth='public', csrf=False)
def last_mile_tally(self):
"""
按尾程快递理货的接口,查询所有清关中的提单关联的大包,且大包状态为未理货的数量,按下一阶段服务商名称分组,下一个阶段服务商名称匹配尾程快递;传给PDA。
尾程快递名称,摆放区域,快递LOGO,大包件数
大包信息字段:大包号,尾程快递名称,尾程快递色值;
"""
return self._get_last_mile_grouped('unprocessed_goods')
@http.route('/api/last_mile/delivery', type='json', auth='public', csrf=False)
def last_mile_delivery(self):
"""
按尾程快递交货接口,查询系统所有清关中的提单关联的大包,且大包状态为已理货的数量,按下一阶段服务商名称分组,下一个阶段服务商名称匹配尾程快递传给PDA。传输字段与理货的一致。
"""
return self._get_last_mile_grouped('checked_goods')
def _get_last_mile_grouped(self, tally_state):
# 1. 查所有清关中提单
bls = request.env['cc.bl'].sudo().search([('state', '=', 'ccing')])
# 2. 查所有大包
big_packages = request.env['cc.big.package'].sudo().search([
('bl_id', 'in', bls.ids),
('tally_state', '=', tally_state)
])
# 3. 按"下一阶段服务商名称"分组
group_dict = {}
for pkg in big_packages:
provider = request.env['cc.last.mile.provider'].sudo(
).match_provider(pkg.next_provider_name)
if not provider:
continue
key = provider.id
if key not in group_dict:
group_dict[key] = provider.search_pro_info() # 查询快递信息
group_dict[key]['count'] = 0
group_dict[key]['big_packages'] = []
group_dict[key]['count'] += 1
group_dict[key]['big_packages'].append({
'big_package_no': pkg.big_package_no or '', # 大包号
'provider_name': provider.name or '', # 尾程快递名称
'tape_color_value': provider.tape_color_value, # 色值
})
# 4. 返回
return {'provider_info_arr': list(group_dict.values()), 'state': 200}
......@@ -6,6 +6,8 @@ from . import res_config_setting
from . import ao_tt_api_log
from . import cc_node
from . import cc_bill_loading
from . import ir_attachment
from . import http
# -*- coding: utf-8 -*-
import logging
from datetime import datetime, timedelta
import time
import hashlib
from odoo import models
from odoo.http import request
__author__ = 'yubo.peng'
_logger = logging.getLogger(__name__)
class AuthenticationError(Exception):
pass
class Http(models.AbstractModel):
_inherit = 'ir.http'
@classmethod
def _auth_method_erp_token(cls):
# 从headers.environ中获取对方传过来的token,timestamp,加密的校验字符串
datas = request.httprequest.headers.environ
if 'HTTP_TOKEN' in datas and 'HTTP_TIMESTAMP' in datas and 'HTTP_CHECKSTR' in datas:
# 从系统参数中获取TOKEt和密钥
factory_secret = request.env['ir.config_parameter'].sudo().get_param('erp_token')
erp_secret_key = request.env['ir.config_parameter'].sudo().get_param('erp_secret_key')
# 从系统参数获取是否时间校验,-1为不校验,其他为校验
check_timeout = int(request.env['ir.config_parameter'].sudo().get_param('check_timeout') or 5)
if not factory_secret:
raise AuthenticationError('系统中未设置ERP TOKEN')
if datas['HTTP_TOKEN'] != factory_secret:
raise AuthenticationError('无效的token')
if check_timeout > 0:
post_time = int(datas['HTTP_TIMESTAMP'])
datetime_post = datetime.fromtimestamp(post_time)
datetime_now = datetime.now().replace(microsecond=0)
datetime_del = datetime_now + timedelta(seconds=check_timeout)
if datetime_post > datetime_del:
raise AuthenticationError('请求已过期')
# 获得sha1_str加密字符串
check_str = '%s%s%s' % (datas['HTTP_TOKEN'], datas['HTTP_TIMESTAMP'], erp_secret_key)
check_crm_str = hashlib.sha1(check_str.encode('utf-8')).hexdigest()
if check_crm_str.upper() != datas['HTTP_CHECKSTR'].upper():
raise AuthenticationError('数据校验不通过')
else:
raise AuthenticationError('请求参数中无token')
# -*- coding: utf-8 -*-
from odoo import models, api, _
from collections import defaultdict
from odoo.exceptions import AccessError
class IrAttachment(models.Model):
_inherit = "ir.attachment"
# override
@api.model
def check(self, mode, values=None):
""" Restricts the access to an ir.attachment, according to referred mode """
if self.env.is_superuser():
return True
# Always require an internal user (aka, employee) to access to a attachment
if not (self.env.is_admin() or self.env.user.has_group('base.group_user')):
raise AccessError(
_("Sorry, you are not allowed to access this document."))
# collect the records to check (by model)
model_ids = defaultdict(set) # {model_name: set(ids)}
if self:
# DLE P173: `test_01_portal_attachment`
self.env['ir.attachment'].flush(['res_model', 'res_id', 'create_uid', 'public', 'res_field'])
self._cr.execute('SELECT res_model, res_id, create_uid, public, res_field FROM ir_attachment WHERE id IN %s', [tuple(self.ids)])
for res_model, res_id, create_uid, public, res_field in self._cr.fetchall():
if public and mode == 'read':
continue
if not (res_model and res_id):
continue
model_ids[res_model].add(res_id)
if values and values.get('res_model') and values.get('res_id'):
model_ids[values['res_model']].add(values['res_id'])
# check access rights on the records
for res_model, res_ids in model_ids.items():
# ignore attachments that are not attached to a resource anymore
# when checking access rights (resource was deleted but attachment
# was not)
if res_model not in self.env:
continue
if res_model == 'res.users' and len(res_ids) == 1 and self.env.uid == list(res_ids)[0]:
# by default a user cannot write on itself, despite the list of writeable fields
# e.g. in the case of a user inserting an image into his image signature
# we need to bypass this check which would needlessly throw us away
continue
records = self.env[res_model].browse(res_ids).exists()
# For related models, check if we can write to the model, as unlinking
# and creating attachments can be seen as an update to the model
access_mode = 'write' if mode in ('create', 'unlink') else mode
records.check_access_rights(access_mode)
records.check_access_rule(access_mode)
@api.model
def read_as_sudo(self, domain=None, fields=None):
return self.sudo().search_read(domain, fields)
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论