1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
|
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import base64
import json
import logging
import re
import requests
from stdnum.eu.vat import check_vies
from odoo import api, fields, models, _
from odoo.tools.image import base64_to_image
_logger = logging.getLogger(__name__)
PARTNER_AC_TIMEOUT = 5
SUPPORTED_VAT_PREFIXES = {
'AT', 'BE', 'BG', 'CY', 'CZ', 'DE', 'DK', 'EE', 'EL', 'ES', 'FI',
'FR', 'HR', 'HU', 'IE', 'IT', 'LT', 'LU', 'LV', 'MT', 'NL', 'PL',
'PT', 'RO', 'SE', 'SI', 'SK', 'XI', 'EU'}
VAT_COUNTRY_MAPPING = {
'EL': 'GR', # Greece
'XI': 'GB', # United Kingdom (Northern Ireland)
}
class ResPartner(models.Model):
_name = 'res.partner'
_inherit = 'res.partner'
partner_gid = fields.Integer('Company database ID')
additional_info = fields.Char('Additional info')
@api.model
def _iap_replace_location_codes(self, iap_data):
country_code, country_name = iap_data.pop('country_code', False), iap_data.pop('country_name', False)
state_code, state_name = iap_data.pop('state_code', False), iap_data.pop('state_name', False)
country, state = None, None
if country_code:
country = self.env['res.country'].search([['code', '=ilike', country_code]])
if not country and country_name:
country = self.env['res.country'].search([['name', '=ilike', country_name]])
if country:
if state_code:
state = self.env['res.country.state'].search([
('country_id', '=', country.id), ('code', '=ilike', state_code)
], limit=1)
if not state and state_name:
state = self.env['res.country.state'].search([
('country_id', '=', country.id), ('name', '=ilike', state_name)
], limit=1)
if country:
iap_data['country_id'] = {'id': country.id, 'display_name': country.display_name}
if state:
iap_data['state_id'] = {'id': state.id, 'display_name': state.display_name}
return iap_data
@api.model
def _iap_replace_logo(self, iap_data):
if iap_data.get('logo'):
try:
iap_data['image_1920'] = base64.b64encode(
requests.get(iap_data['logo'], timeout=PARTNER_AC_TIMEOUT).content
)
except Exception:
iap_data['image_1920'] = False
finally:
iap_data.pop('logo')
# avoid keeping falsy images (may happen that a blank page is returned that leads to an incorrect image)
if iap_data['image_1920']:
try:
base64_to_image(iap_data['image_1920'])
except Exception:
iap_data.pop('image_1920')
return iap_data
@api.model
def _format_data_company(self, iap_data):
self._iap_replace_location_codes(iap_data)
if iap_data.get('child_ids'):
child_ids = []
for child in iap_data.get('child_ids'):
child_ids.append(self._iap_replace_location_codes(child))
iap_data['child_ids'] = child_ids
if iap_data.get('additional_info'):
iap_data['additional_info'] = json.dumps(iap_data['additional_info'])
return iap_data
@api.model
def autocomplete(self, query, timeout=15):
suggestions, _ = self.env['iap.autocomplete.api']._request_partner_autocomplete('search', {
'query': query,
}, timeout=timeout)
if suggestions:
results = []
for suggestion in suggestions:
results.append(self._format_data_company(suggestion))
return results
else:
return []
@api.model
def enrich_company(self, company_domain, partner_gid, vat, timeout=15):
response, error = self.env['iap.autocomplete.api']._request_partner_autocomplete('enrich', {
'domain': company_domain,
'partner_gid': partner_gid,
'vat': vat,
}, timeout=timeout)
if response and response.get('company_data'):
result = self._format_data_company(response.get('company_data'))
else:
result = {}
if response and response.get('credit_error'):
result.update({
'error': True,
'error_message': 'Insufficient Credit'
})
elif error:
result.update({
'error': True,
'error_message': error
})
return result
@api.model
def read_by_vat(self, vat, timeout=15):
vies_vat_data, _ = self.env['iap.autocomplete.api']._request_partner_autocomplete('search_vat', {
'vat': vat,
}, timeout=timeout)
if vies_vat_data:
return [self._format_data_company(vies_vat_data)]
else:
vies_result = None
try:
_logger.info('Calling VIES service to check VAT for autocomplete: %s', vat)
vies_result = check_vies(vat, timeout=timeout)
except Exception:
_logger.warning("Failed VIES VAT check.", exc_info=True)
if vies_result:
name = vies_result['name']
if vies_result['valid'] and name != '---':
address = list(filter(bool, vies_result['address'].split('\n')))
street = address[0]
zip_city_record = next(filter(lambda addr: re.match(r'^\d.*', addr), address[1:]), None)
zip_city = zip_city_record.split(' ', 1) if zip_city_record else [None, None]
street2 = next((addr for addr in filter(lambda addr: addr != zip_city_record, address[1:])), None)
return [self._iap_replace_location_codes({
'name': name,
'vat': vat,
'street': street,
'street2': street2,
'city': zip_city[1],
'zip': zip_city[0],
'country_code': vies_result['countryCode'],
'skip_enrich': True,
})]
return []
@api.model
def _is_company_in_europe(self, partner_country_code, vat_country_code):
return partner_country_code == VAT_COUNTRY_MAPPING.get(vat_country_code, vat_country_code)
def _is_vat_syncable(self, vat):
if not vat:
return False
vat_country_code = vat[:2]
partner_country_code = self.country_id.code if self.country_id else ''
# Check if the VAT prefix is supported and corresponds to the partner's country or no country is set
is_vat_supported = (
vat_country_code in SUPPORTED_VAT_PREFIXES
and (self._is_company_in_europe(partner_country_code, vat_country_code) or not partner_country_code))
is_gst_supported = (
self.check_gst_in(vat)
and partner_country_code == self.env.ref('base.in').code or not partner_country_code)
return is_vat_supported or is_gst_supported
def check_gst_in(self, vat):
# reference from https://www.gstzen.in/a/format-of-a-gst-number-gstin.html
if vat and len(vat) == 15:
all_gstin_re = [
r'\d{2}[a-zA-Z]{5}\d{4}[a-zA-Z][1-9A-Za-z][Zz1-9A-Ja-j][0-9a-zA-Z]', # Normal, Composite, Casual GSTIN
r'\d{4}[A-Z]{3}\d{5}[UO]N[A-Z0-9]', # UN/ON Body GSTIN
r'\d{4}[a-zA-Z]{3}\d{5}NR[0-9a-zA-Z]', # NRI GSTIN
r'\d{2}[a-zA-Z]{4}[a-zA-Z0-9]\d{4}[a-zA-Z][1-9A-Za-z][DK][0-9a-zA-Z]', # TDS GSTIN
r'\d{2}[a-zA-Z]{5}\d{4}[a-zA-Z][1-9A-Za-z]C[0-9a-zA-Z]' # TCS GSTIN
]
return any(re.match(rx, vat) for rx in all_gstin_re)
return False
def _is_synchable(self):
already_synched = self.env['res.partner.autocomplete.sync'].search([('partner_id', '=', self.id), ('synched', '=', True)])
return self.is_company and self.partner_gid and not already_synched
def _update_autocomplete_data(self, vat):
self.ensure_one()
if vat and self._is_synchable() and self._is_vat_syncable(vat):
self.env['res.partner.autocomplete.sync'].sudo().add_to_queue(self.id)
@api.model_create_multi
def create(self, vals_list):
partners = super(ResPartner, self).create(vals_list)
if len(vals_list) == 1:
partners._update_autocomplete_data(vals_list[0].get('vat', False))
if partners.additional_info:
template_values = json.loads(partners.additional_info)
template_values['flavor_text'] = _("Partner created by Odoo Partner Autocomplete Service")
partners.message_post_with_source(
'iap_mail.enrich_company',
render_values=template_values,
subtype_xmlid='mail.mt_note',
)
partners.write({'additional_info': False})
return partners
def write(self, values):
res = super(ResPartner, self).write(values)
if len(self) == 1:
self._update_autocomplete_data(values.get('vat', False))
return res
@api.model
def _get_view(self, view_id=None, view_type='form', **options):
arch, view = super()._get_view(view_id, view_type, **options)
if view_type == 'form':
for node in arch.xpath("//field[@name='name' or @name='vat']"):
node.set('widget', 'field_partner_autocomplete')
return arch, view
|