1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748
|
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import base64
import codecs
import collections
import csv
import difflib
import unicodedata
import chardet
import datetime
import io
import itertools
import logging
import psycopg2
import operator
import os
import re
import requests
from PIL import Image
from collections import defaultdict
from odoo import api, fields, models
from odoo.exceptions import UserError
from odoo.tools.translate import _
from odoo.tools.mimetypes import guess_mimetype
from odoo.tools import config, DEFAULT_SERVER_DATE_FORMAT, DEFAULT_SERVER_DATETIME_FORMAT, parse_version
FIELDS_RECURSION_LIMIT = 3
ERROR_PREVIEW_BYTES = 200
DEFAULT_IMAGE_TIMEOUT = 3
DEFAULT_IMAGE_MAXBYTES = 10 * 1024 * 1024
DEFAULT_IMAGE_REGEX = r"^(?:http|https)://"
DEFAULT_IMAGE_CHUNK_SIZE = 32768
IMAGE_FIELDS = ["icon", "image", "logo", "picture"]
_logger = logging.getLogger(__name__)
BOM_MAP = {
'utf-16le': codecs.BOM_UTF16_LE,
'utf-16be': codecs.BOM_UTF16_BE,
'utf-32le': codecs.BOM_UTF32_LE,
'utf-32be': codecs.BOM_UTF32_BE,
}
try:
import xlrd
try:
from xlrd import xlsx
except ImportError:
xlsx = None
except ImportError:
xlrd = xlsx = None
if xlsx:
from lxml import etree
# xlrd.xlsx supports defusedxml, defusedxml's etree interface is broken
# (missing ElementTree and thus ElementTree.iter) which causes a fallback to
# Element.getiterator(), triggering a warning before 3.9 and an error from 3.9.
#
# We have defusedxml installed because zeep has a hard dep on defused and
# doesn't want to drop it (mvantellingen/python-zeep#1014).
#
# Ignore the check and set the relevant flags directly using lxml as we have a
# hard dependency on it.
xlsx.ET = etree
xlsx.ET_has_iterparse = True
xlsx.Element_has_iter = True
try:
from . import odf_ods_reader
except ImportError:
odf_ods_reader = None
try:
from openpyxl import load_workbook
except ImportError:
load_workbook = None
FILE_TYPE_DICT = {
'text/csv': ('csv', True, None),
'application/vnd.ms-excel': ('xls', xlrd, 'xlrd'),
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': (
'xlsx',
load_workbook or xlsx,
# if xlrd 2.x then xlsx is not available, so don't suggest it
'openpyxl' if xlrd and parse_version(xlrd.__VERSION__) >= parse_version("2.0") else 'openpyxl or xlrd >= 1.0.0 < 2.0',
),
'application/vnd.oasis.opendocument.spreadsheet': ('ods', odf_ods_reader, 'odfpy')
}
EXTENSIONS = {
'.' + ext: handler
for mime, (ext, handler, req) in FILE_TYPE_DICT.items()
}
class ImportValidationError(Exception):
"""
This class is made to correctly format all the different error types that
can occur during the pre-validation of the import that is made before
calling the data loading itself. The Error data structure is meant to copy
the one of the errors raised during the data loading. It simplifies the
error management at client side as all errors can be treated the same way.
This exception is typically raised when there is an error during data
parsing (image, int, dates, etc..) or if the user did not select at least
one field to map with a column.
"""
def __init__(self, message, **kwargs):
super().__init__(message)
self.type = kwargs.get('error_type', 'error')
self.message = message
self.record = False
self.not_matching_error = True
self.field_path = [kwargs['field']] if kwargs.get('field') else False
self.field_type = kwargs.get('field_type')
class Base(models.AbstractModel):
_inherit = 'base'
@api.model
def get_import_templates(self):
"""
Get the import templates label and path.
:return: a list(dict) containing label and template path
like ``[{'label': 'foo', 'template': 'path'}]``
"""
return []
class ImportMapping(models.Model):
""" mapping of previous column:field selections
This is useful when repeatedly importing from a third-party
system: column names generated by the external system may
not match Odoo's field names or labels. This model is used
to save the mapping between column names and fields so that
next time a user imports from the same third-party systems
we can automatically match the columns to the correct field
without them having to re-enter the mapping every single
time.
"""
_name = 'base_import.mapping'
_description = 'Base Import Mapping'
res_model = fields.Char(index=True)
column_name = fields.Char()
field_name = fields.Char()
class ResUsers(models.Model):
_inherit = 'res.users'
def _can_import_remote_urls(self):
""" Hook to decide whether the current user is allowed to import
images via URL (as such an import can DOS a worker). By default,
allows the administrator group.
:rtype: bool
"""
self.ensure_one()
return self._is_admin()
class Import(models.TransientModel):
"""
This model is used to prepare the loading of data coming from a user file.
Here is the process that is followed:
#. The user selects a file to import.
#. File parsing and mapping suggestion (see "parse_preview" method)
#. Extract the current model's importable fields tree (see :meth:`get_fields_tree`).
#. Read the file (see :meth:`_read_file`) and extract header names and file
length (used for batch import).
#. Extract headers types from the data preview (10 first line of the file)
(see :meth:`_extract_headers_types`).
#. Try to find for each header a field to map with (see :meth:`_get_mapping_suggestions`)
- First check the previously saved mappings between the header name
and one of the model's fields.
- If no mapping found, try an exact match comparison using fields
technical names, labels and user language translated labels.
- If nothing found, try a fuzzy match using word distance between
header name and fields tachnical names, labels and user language
translated labels. Keep only the closest match.
#. Prepare examples for each columns using the first non null value from each column.
#. Send the info back to the UI where the user can modify the suggested mapping.
#. Execute the import: There are two import mode with uses the same process. (see :meth:`execute_import`)
#. Test import: Try to import but rollback the transaction. This allows
the check errors during the import process and allow the user to
choose import options for the different encountered errors.
#. Real import: Try to import the file using the configured mapping and
the eventual "error mapping options". If import encounters blocking
errors, the transaction is rollbacked and the user is allowed to
choose import options for the different errors.
- Get file data and fields to import into (see :meth:`_convert_import_data`).
- Parse date, float and binary data (see :meth:`_parse_import_data`).
- Handle multiple mapping -> concatenate char/text/many2many columns
mapped on the same field (see :meth:`_handle_multi_mapping`).
- Handle fallback values for boolean and selection fields, in case
input data does not match any allowed values (see :meth:`_handle_fallback_values`).
- Load data (see ir.model "load" method).
- Rollback transaction if test mode or if encountered error.
- Save mapping if any import is successful to ease later mapping suggestions.
- Return import result to the UI (success or errors if any).
"""
_name = 'base_import.import'
_description = 'Base Import'
# allow imports to survive for 12h in case user is slow
_transient_max_hours = 12.0
# we consider that if the difference is more than 0.2, then the two compared strings are "too different" to propose
# any match between them. (see '_get_mapping_suggestion' for more details)
FUZZY_MATCH_DISTANCE = 0.2
res_model = fields.Char('Model')
file = fields.Binary('File', help="File to check and/or import, raw binary (not base64)", attachment=False)
file_name = fields.Char('File Name')
file_type = fields.Char('File Type')
@api.model
def get_fields_tree(self, model, depth=FIELDS_RECURSION_LIMIT):
""" Recursively get fields for the provided model (through
fields_get) and filter them according to importability
The output format is a list of :class:`Field`:
.. class:: Field
.. attribute:: id: str
A non-unique identifier for the field, used to compute
the span of the ``required`` attribute: if multiple
``required`` fields have the same id, only one of them
is necessary.
.. attribute:: name: str
The field's logical (Odoo) name within the scope of
its parent.
.. attribute:: string: str
The field's human-readable name (``@string``)
.. attribute:: required: bool
Whether the field is marked as required in the
model. Clients must provide non-empty import values
for all required fields or the import will error out.
.. attribute:: fields: list[Field]
The current field's subfields. The database and
external identifiers for m2o and m2m fields; a
filtered and transformed fields_get for o2m fields (to
a variable depth defined by ``depth``).
Fields with no sub-fields will have an empty list of
sub-fields.
.. attribute:: model_name: str
Used in the Odoo Field Tooltip on the import view
and to get the model of the field of the related field(s).
Name of the current field's model.
.. attribute:: comodel_name: str
Used in the Odoo Field Tooltip on the import view
and to get the model of the field of the related field(s).
Name of the current field's comodel, i.e. if the field is a relation field.
Structure example for 'crm.team' model for returned importable_fields::
[
{'name': 'message_ids', 'string': 'Messages', 'model_name': 'crm.team', 'comodel_name': 'mail.message', 'fields': [
{'name': 'moderation_status', 'string': 'Moderation Status', 'model_name': 'mail.message', 'fields': []},
{'name': 'body', 'string': 'Contents', 'model_name': 'mail.message', 'fields' : []}
]},
{{'name': 'name', 'string': 'Sales Team', 'model_name': 'crm.team', 'fields' : []}
]
:param str model: name of the model to get fields form
:param int depth: depth of recursion into o2m fields
"""
Model = self.env[model]
importable_fields = [{
'id': 'id',
'name': 'id',
'string': _("External ID"),
'required': False,
'fields': [],
'type': 'id',
'model_name': model,
}]
if not depth:
return importable_fields
model_fields = Model.fields_get(attributes=[
'string', 'required', 'type', 'readonly', 'relation',
'definition_record', 'definition_record_field',
])
blacklist = models.MAGIC_COLUMNS
for name, field in dict(model_fields).items():
if field['type'] not in 'properties':
continue
definition_record = field['definition_record']
definition_record_field = field['definition_record_field']
target_model = Model.env[Model._fields[definition_record].comodel_name]
# Do not take into account the definition of archived parents,
# we do not import archived records most of the time.
definition_records = target_model.search_fetch(
[(definition_record_field, '!=', False)],
[definition_record_field, 'display_name'],
order='id', # Avoid complex order
)
for record in definition_records:
for definition in record[definition_record_field]:
definition_type = definition['type']
if (
definition_type == 'separator' or
(
definition_type in ('many2one', 'many2many')
and definition.get('comodel') not in Model.env
)
):
continue
id_field = f"{name}.{definition['name']}"
model_fields[id_field] = {
'type': definition_type,
'string': _(
"%(property_string)s (%(parent_name)s)",
property_string=definition['string'], parent_name=record.display_name,
),
}
if definition_type in ('many2one', 'many2many'):
model_fields[id_field]['relation'] = definition['comodel']
for name, field in model_fields.items():
if name in blacklist:
continue
if field.get('readonly'):
continue
field_value = {
'id': name,
'name': name,
'string': field['string'],
# Y U NO ALWAYS HAS REQUIRED
'required': bool(field.get('required')),
'fields': [],
'type': field['type'],
'model_name': model,
}
if field['type'] in ('many2many', 'many2one'):
field_value['fields'] = [
dict(field_value, name='id', string=_("External ID"), type='id'),
dict(field_value, name='.id', string=_("Database ID"), type='id'),
]
field_value['comodel_name'] = field['relation']
elif field['type'] == 'one2many':
field_value['fields'] = self.get_fields_tree(field['relation'], depth=depth-1)
if self.env.user.has_group('base.group_no_one'):
field_value['fields'].append({'id': '.id', 'name': '.id', 'string': _("Database ID"), 'required': False, 'fields': [], 'type': 'id'})
field_value['comodel_name'] = field['relation']
importable_fields.append(field_value)
return importable_fields
def _filter_fields_by_types(self, model_fields_tree, header_types):
""" Remove from model_fields_tree param all the fields and subfields
that do not match the types in header_types
:param: list[dict] model_fields_tree: Contains recursively all the importable fields of the target model.
Generated in "get_fields_tree" method.
:param: list header_types: Contains the extracted fields types of the current header.
Generated in :meth:`_extract_header_types`.
"""
most_likely_fields_tree = []
for field in model_fields_tree:
subfields = field.get('fields')
if subfields:
filtered_field = dict(field) # Avoid modifying fields.
filtered_field['fields'] = self._filter_fields_by_types(subfields, header_types)
most_likely_fields_tree.append(filtered_field)
elif field.get('type') in header_types:
most_likely_fields_tree.append(field)
return most_likely_fields_tree
def _read_file(self, options):
""" Dispatch to specific method to read file content, according to its mimetype or file type
:param dict options: reading options (quoting, separator, ...)
"""
self.ensure_one()
e = None
# guess mimetype from file content
mimetype = guess_mimetype(self.file or b'')
(file_extension, handler, req) = FILE_TYPE_DICT.get(mimetype, (None, None, None))
if handler:
try:
return getattr(self, '_read_' + file_extension)(options)
except (ImportValidationError, ValueError):
raise
except Exception as exc: # noqa: BLE001
e = read_file_failed(exc, f"Unable to read file {self.file_name or '<unknown>'!r} as {file_extension!r} (guessed using mimetype {mimetype!r}).")
# try reading with user-provided mimetype
(file_extension, handler2, req2) = FILE_TYPE_DICT.get(self.file_type, (None, None, None))
if handler2 and handler2 != handler:
try:
return getattr(self, '_read_' + file_extension)(options)
except (ImportValidationError, ValueError):
raise
except Exception as exc: # noqa: BLE001
e = read_file_failed(exc, f"Unable to read file {self.file_name or '<unknown>'!r} as {file_extension!r} (decided from user-provided mimetype {self.file_type!r}).")
# fallback on file extensions as mime types can be unreliable (e.g.
# software setting incorrect mime types, or non-installed software
# leading to browser not sending mime types)
if self.file_name:
_stem, ext = os.path.splitext(self.file_name)
if (h := EXTENSIONS.get(ext)) and h != handler and h != handler2:
try:
return getattr(self, '_read_' + ext[1:])(options)
except (ImportValidationError, ValueError):
raise
except Exception as exc: # noqa: BLE001
e = read_file_failed(exc, f"Unable to read file {self.file_name!r} as {file_extension!r} (decided from file extension {ext!r}).")
if e is not None:
raise e
if req2 or req:
raise UserError(_("Unable to load \"{extension}\" file: requires Python module \"{modname}\"").format(extension=file_extension, modname=req2 or req))
raise UserError(_("Unsupported file format \"{}\", import only supports CSV, ODS, XLS and XLSX").format(self.file_type))
def _read_xls(self, options):
book = xlrd.open_workbook(file_contents=self.file or b'')
sheets = options['sheets'] = book.sheet_names()
sheet = options['sheet'] = options.get('sheet') or sheets[0]
return self._read_xls_book(book, sheet)
def _read_xls_book(self, book, sheet_name):
sheet = book.sheet_by_name(sheet_name)
rows = []
# emulate Sheet.get_rows for pre-0.9.4
for rowx, row in enumerate(map(sheet.row, range(sheet.nrows)), 1):
values = []
for colx, cell in enumerate(row, 1):
if cell.ctype is xlrd.XL_CELL_NUMBER:
is_float = cell.value % 1 != 0.0
values.append(
str(cell.value)
if is_float
else str(int(cell.value))
)
elif cell.ctype is xlrd.XL_CELL_DATE:
is_datetime = cell.value % 1 != 0.0
# emulate xldate_as_datetime for pre-0.9.3
dt = datetime.datetime(*xlrd.xldate.xldate_as_tuple(cell.value, book.datemode))
values.append(
dt.strftime(DEFAULT_SERVER_DATETIME_FORMAT)
if is_datetime
else dt.strftime(DEFAULT_SERVER_DATE_FORMAT)
)
elif cell.ctype is xlrd.XL_CELL_BOOLEAN:
values.append(u'True' if cell.value else u'False')
elif cell.ctype is xlrd.XL_CELL_ERROR:
raise ValueError(
_("Invalid cell value at row %(row)s, column %(col)s: %(cell_value)s") % {
'row': rowx,
'col': colx,
'cell_value': xlrd.error_text_from_code.get(cell.value, _("unknown error code %s", cell.value))
}
)
else:
values.append(cell.value)
if any(x for x in values if x.strip()):
rows.append(values)
# return the file length as first value
return sheet.nrows, rows
# use the same method for xlsx and xls files
def _read_xlsx(self, options):
if xlsx:
return self._read_xls(options)
import openpyxl.cell.cell as types
import openpyxl.styles.numbers as styles # noqa: PLC0415
book = load_workbook(io.BytesIO(self.file or b''), data_only=True)
sheets = options['sheets'] = book.sheetnames
sheet_name = options['sheet'] = options.get('sheet') or sheets[0]
sheet = book[sheet_name]
rows = []
for rowx, row in enumerate(sheet.rows, 1):
values = []
for colx, cell in enumerate(row, 1):
if cell.data_type is types.TYPE_ERROR:
raise ValueError(
_("Invalid cell value at row %(row)s, column %(col)s: %(cell_value)s", row=rowx, col=colx, cell_value=cell.value)
)
if cell.value is None:
values.append('')
elif isinstance(cell.value, float):
if cell.value % 1 == 0:
values.append(str(int(cell.value)))
else:
values.append(str(cell.value))
elif cell.is_date:
d_fmt = styles.is_datetime(cell.number_format)
if d_fmt == "datetime":
values.append(cell.value.strftime(DEFAULT_SERVER_DATETIME_FORMAT))
elif d_fmt == "date":
values.append(cell.value.strftime(DEFAULT_SERVER_DATE_FORMAT))
else:
raise ValueError(
_("Invalid cell format at row %(row)s, column %(col)s: %(cell_value)s, with format: %(cell_format)s, as (%(format_type)s) formats are not supported.", row=rowx, col=colx, cell_value=cell.value, cell_format=cell.number_format, format_type=d_fmt)
)
else:
values.append(str(cell.value))
if any(x.strip() for x in values):
rows.append(values)
return sheet.max_row, rows
def _read_ods(self, options):
doc = odf_ods_reader.ODSReader(file=io.BytesIO(self.file or b''))
sheets = options['sheets'] = list(doc.SHEETS.keys())
sheet = options['sheet'] = options.get('sheet') or sheets[0]
content = [
row
for row in doc.getSheet(sheet)
if any(x for x in row if x.strip())
]
# return the file length as first value
return len(content), content
def _read_csv(self, options):
""" Returns file length and a CSV-parsed list of all non-empty lines in the file.
:raises csv.Error: if an error is detected during CSV parsing
"""
csv_data = self.file or b''
if not csv_data:
return ()
encoding = options.get('encoding')
if not encoding:
encoding = options['encoding'] = chardet.detect(csv_data)['encoding'].lower()
# some versions of chardet (e.g. 2.3.0 but not 3.x) will return
# utf-(16|32)(le|be), which for python means "ignore / don't strip
# BOM". We don't want that, so rectify the encoding to non-marked
# IFF the guessed encoding is LE/BE and csv_data starts with a BOM
bom = BOM_MAP.get(encoding)
if bom and csv_data.startswith(bom):
encoding = options['encoding'] = encoding[:-2]
csv_text = csv_data.decode(encoding)
separator = options.get('separator')
if not separator:
# default for unspecified separator so user gets a message about
# having to specify it
separator = ','
for candidate in (',', ';', '\t', ' ', '|', unicodedata.lookup('unit separator')):
# pass through the CSV and check if all rows are the same
# length & at least 2-wide assume it's the correct one
it = csv.reader(io.StringIO(csv_text), quotechar=options['quoting'], delimiter=candidate)
w = None
for row in it:
width = len(row)
if w is None:
w = width
if width == 1 or width != w:
break # next candidate
else: # nobreak
separator = options['separator'] = candidate
break
if not len(options['quoting']) == 1:
raise ImportValidationError(_("Error while importing records: Text Delimiter should be a single character."))
csv_iterator = csv.reader(
io.StringIO(csv_text),
quotechar=options['quoting'],
delimiter=separator)
content = [
row for row in csv_iterator
if any(x for x in row if x.strip())
]
# return the file length as first value
return len(content), content
@api.model
def _extract_header_types(self, preview_values, options):
""" Returns the potential field types, based on the preview values, using heuristics.
This methods is only used for suggested mapping at 2 levels:
1. for fuzzy mapping at file load -> Execute the fuzzy mapping only
on "most likely field types"
2. For "Suggested fields" section in the fields mapping dropdown list at UI side.
The following heuristic is used: If all preview values
- Start with ``__export__``: return id + relational field types
- Can be cast into integer: return id + relational field types, integer, float and monetary
- Can be cast into Boolean: return boolean
- Can be cast into float: return float, monetary
- Can be cast into date/datetime: return date / datetime
- Cannot be cast into any of the previous types: return only text based fields
:param preview_values: list of value for the column to determine
see :meth:`parse_preview` for more details.
:param options: parsing options
"""
values = set(preview_values)
# If all values are empty in preview than can be any field
if values == {''}:
return ['all']
# If all values starts with __export__ this is probably an id
if all(v.startswith('__export__') for v in values):
return ['id', 'many2many', 'many2one', 'one2many']
# If all values can be cast to int type is either id, float or monetary
# Exception: if we only have 1 and 0, it can also be a boolean
if all(v.isdigit() for v in values if v):
field_type = ['integer', 'float', 'monetary']
if {'0', '1', ''}.issuperset(values):
field_type.append('boolean')
return field_type
# If all values are either True or False, type is boolean
if all(val.lower() in ('true', 'false', 't', 'f', '') for val in preview_values):
return ['boolean']
# If all values can be cast to float, type is either float or monetary
try:
thousand_separator = decimal_separator = False
for val in preview_values:
val = val.strip()
if not val:
continue
# value might have the currency symbol left or right from the value
val = self._remove_currency_symbol(val)
if val:
if options.get('float_thousand_separator') and options.get('float_decimal_separator'):
if options['float_decimal_separator'] == '.' and val.count('.') > 1:
# This is not a float so exit this try
float('a')
val = val.replace(options['float_thousand_separator'], '').replace(options['float_decimal_separator'], '.')
# We are now sure that this is a float, but we still need to find the
# thousand and decimal separator
else:
if val.count('.') > 1:
options['float_thousand_separator'] = '.'
options['float_decimal_separator'] = ','
elif val.count(',') > 1:
options['float_thousand_separator'] = ','
options['float_decimal_separator'] = '.'
elif val.find('.') > val.find(','):
thousand_separator = ','
decimal_separator = '.'
elif val.find(',') > val.find('.'):
thousand_separator = '.'
decimal_separator = ','
else:
# This is not a float so exit this try
float('a')
if thousand_separator and not options.get('float_decimal_separator'):
options['float_thousand_separator'] = thousand_separator
options['float_decimal_separator'] = decimal_separator
return ['float', 'monetary'] # Allow float to be mapped on a text field.
except ValueError:
pass
results = self._try_match_date_time(preview_values, options)
if results:
return results
# If not boolean, date/datetime, float or integer, only suggest text based fields.
return ['text', 'char', 'binary', 'selection', 'html', 'tags']
def _try_match_date_time(self, preview_values, options):
# Or a date/datetime if it matches the pattern
date_patterns = [options['date_format']] if options.get(
'date_format') else []
user_date_format = self.env['res.lang']._get_data(code=self.env.user.lang).date_format
if user_date_format:
try:
to_re(user_date_format)
date_patterns.append(user_date_format)
except KeyError:
pass
date_patterns.extend(DATE_PATTERNS)
match = check_patterns(date_patterns, preview_values)
if match:
options['date_format'] = match
return ['date', 'datetime']
datetime_patterns = [options['datetime_format']] if options.get(
'datetime_format') else []
datetime_patterns.extend(
"%s %s" % (d, t)
for d in date_patterns
for t in TIME_PATTERNS
)
match = check_patterns(datetime_patterns, preview_values)
if match:
options['datetime_format'] = match
return ['datetime']
return []
@api.model
def _extract_headers_types(self, headers, preview, options):
"""
For each column, this method will extract the potential data types based on the preview values
:param list headers: list of headers names. Used as part of key for
returned headers_types to ease understanding of its usage
:param list preview: list of the first file records (see "parse_preview" for more detail) e.g.::
[ ["lead_name1", "1", "partner_id1"], ["lead_name2", "2", "partner_id2"], ... ]
:param options: parsing options
:returns: dict headers_types:
contains all the extracted header types for each header e.g.::
{
(header_index, header_name): ["char", "text", ...],
...
}
"""
headers_types = {}
for column_index, header_name in enumerate(headers):
preview_values = [record[column_index].strip() for record in preview]
type_field = self._extract_header_types(preview_values, options)
headers_types[(column_index, header_name)] = type_field
return headers_types
def _get_mapping_suggestion(self, header, fields_tree, header_types, mapping_fields):
""" Attempts to match a given header to a field of the imported model.
We can distinguish 2 types of header format:
- simple header string that aim to directly match a field of the target model
e.g.: "lead_id" or "Opportunities" or "description".
- composed '/' joined header string that aim to match a field of a
relation field of the target model (= subfield) e.g.:
'lead_id/description' aim to match the field ``description`` of the field lead_id.
When returning result, to ease further treatments, the result is
returned as a list, where each element of the list is a field or
a sub-field of the preceding field.
- ``["lead_id"]`` for simple case = simple matching
- ``["lead_id", "description"]`` for composed case = hierarchy matching
Mapping suggestion is found using the following heuristic:
- first we check if there was a saved mapping by the user
- then try to make an exact match on the field technical name /
english label / translated label
- finally, try the "fuzzy match": word distance between the header
title and the field technical name / english label / translated
label, using the lowest result. The field used for the fuzzy match
are based on the field types we extracted from the header data
(see :meth:`_extract_header_types`).
For subfields, use the same logic.
Word distance is a score between 0 and 1 to express the distance
between two char strings where ``0`` denotes an exact match and
``1`` indicates completely different strings
In order to keep only one column matched per field, we return the
distance. That distance will be used during the deduplicate process
(see :meth:`_deduplicate_mapping_suggestions`) and only the
mapping with the smallest distance will be kept in case of multiple
mapping on the same field. Note that we don't need to return the
distance in case of hierachy mapping as we consider that as an
advanced behaviour. The deduplicate process will ignore hierarchy
mapping. The user will have to manually select on which field he
wants to map what in case of mapping duplicates for sub-fields.
:param str header: header name from the file
:param list fields_tree: list of all the field of the target model
Coming from :meth:`get_fields_tree`
e.g: ``[ { 'name': 'fieldName', 'string': 'fieldLabel', fields: [ { 'name': 'subfieldName', ...} ]} , ... ]``
:param list header_types: Extracted field types for each column in the parsed file, based on its data content.
Coming from :meth:`_extract_header_types`
e.g.: ``['int', 'float', 'char', 'many2one', ...]``
:param dict mapping_fields: contains the previously saved mapping between header and field for the current model.
E.g.: ``{ header_name: field_name }``
:returns: if the header couldn't be matched: an empty dict
else: a dict with the field path and the distance between header and the matched field.
:rtype: ``dict(field_path + Word distance)``
In case of simple matching: ``{'field_path': [field_name], distance: word_distance}``
e.g.: ``{'field_path': ['lead_id'], distance: 0.23254}``
In case of hierarchy matching: ``{'field_path': [parent_field_name, child_field_name, subchild_field_name]}``
e.g.: ``{'field_path': ['lead_id', 'description']}``
"""
if not fields_tree:
return {}
# First, check in saved mapped fields
mapping_field_name = mapping_fields.get(header.lower())
if mapping_field_name and mapping_field_name:
return {
'field_path': [name for name in mapping_field_name.split('/')],
'distance': -1 # Trick to force to keep that match during mapping deduplication.
}
if '/' not in header:
IrModelFieldsUs = self.with_context(lang='en_US').env['ir.model.fields']
for field in fields_tree:
fname = field['name']
# exact match found based on the field technical name
if header.casefold() == fname.casefold():
break
# match found using either user translation, either model defined field label
if header.casefold() == field['string'].casefold():
break
field_strings_en = IrModelFieldsUs.get_field_string(field['model_name'])
if fname in field_strings_en and header.casefold() == field_strings_en[fname].casefold():
break
else:
field = None
if field: # found an exact match, no need to go further
return {
'field_path': [field['name']],
'distance': 0
}
# If no match found, try fuzzy match on fields filtered based on extracted header types
# Filter out fields with types that does not match corresponding header types.
filtered_fields = self._filter_fields_by_types(fields_tree, header_types)
if not filtered_fields:
return {}
min_dist = 1
min_dist_field = False
for field in filtered_fields:
fname = field['name']
# use string distance for fuzzy match only on most likely field types
distances = [
self._get_distance(header.casefold(), fname.casefold()),
self._get_distance(header.casefold(), field['string'].casefold()),
]
if field_string_en := IrModelFieldsUs.get_field_string(field['model_name']).get(fname):
distances.append(
self._get_distance(header.casefold(), field_string_en.casefold()),
)
# Keep only the closest mapping suggestion. Note that in case of multiple mapping on the same field,
# a mapping suggestion could be canceled by another one that has a smaller distance on the same field.
# See 'deduplicate_mapping_suggestions' method for more info.
current_field_dist = min(distances)
if current_field_dist < min_dist:
min_dist_field = fname
min_dist = current_field_dist
if min_dist < self.FUZZY_MATCH_DISTANCE:
return {
'field_path': [min_dist_field],
'distance': min_dist
}
return {}
# relational field path
field_path = []
subfields_tree = fields_tree
# Iteratively dive into fields tree
for sub_header in header.split('/'):
# Strip sub_header in case spaces are added around '/' for
# readability of paths
# Skip Saved mapping (mapping_field = {})
match = self._get_mapping_suggestion(sub_header.strip(), subfields_tree, header_types, {})
# Any match failure, exit
if not match:
return {}
# prep subfields for next iteration within match['name'][0]
field_name = match['field_path'][0]
subfields_tree = next(item['fields'] for item in subfields_tree if item['name'] == field_name)
field_path.append(field_name)
# No need to return distance for hierarchy mapping
return {'field_path': field_path}
def _get_distance(self, a, b):
""" This method return an index that reflects the distance between the
two given string a and b.
This index is a score between 0 and 1 where ``0`` indicates an exact
match and ``1`` indicates completely different strings.
"""
return 1 - difflib.SequenceMatcher(None, a, b).ratio()
def _get_mapping_suggestions(self, headers, header_types, fields_tree):
""" Attempts to match the imported model's fields to the
titles of the parsed CSV file, if the file is supposed to have
headers.
Returns a dict mapping cell indices to key paths in the ``fields`` tree.
:param list headers: titles of the parsed file
:param dict header_types:
extracted types for each column in the parsed file e.g.::
{
(header_index, header_name): ['int', 'float', 'char', 'many2one',...],
...
}
:param list fields_tree:
list of the target model's fields e.g.::
[
{
'name': 'fieldName',
'string': 'fieldLabel',
'fields': [{ 'name': 'subfieldName', ...}]
},
...
]
:rtype: dict[(int, str), {'field_path': list[str], 'distance': int}]
:returns: mapping_suggestions e.g.:
.. code-block:: python
{
(header_index, header_name): {
'field_path': ['child_id','name'],
'distance': 0
},
...
}
"""
mapping_suggestions = {}
mapping_records = self.env['base_import.mapping'].search_read([('res_model', '=', self.res_model)], ['column_name', 'field_name'])
mapping_fields = {rec['column_name']: rec['field_name'] for rec in mapping_records}
for index, header in enumerate(headers):
match_field = self._get_mapping_suggestion(header, fields_tree, header_types[(index, header)], mapping_fields)
mapping_suggestions[(index, header)] = match_field or None
self._deduplicate_mapping_suggestions(mapping_suggestions)
return mapping_suggestions
def _deduplicate_mapping_suggestions(self, mapping_suggestions):
""" This method is meant to avoid multiple columns to be matched on the same field.
Taking ``mapping_suggestions`` as input, it will check if multiple
columns are mapped to the same field and will only keep the mapping
that has the smallest distance. The other columns that were matched
to the same field are removed from the mapping suggestions.
Hierarchy mapping is considered as advanced and is skipped during this
deduplication process. We consider that multiple mapping on hierarchy
mapping will not occur often and due to the fact that this won't lead
to any particular issues when a non 'char/text' field is selected more
than once in the UI, we keep only the last selected mapping. The
objective is to lighten the mapping suggestion process as much as we can.
:param dict mapping_suggestions: ``{ (column_index, header_name) : { 'field_path': [header_name], 'distance': word_distance }}``
"""
min_dist_per_field = {}
headers_to_keep = []
for header, suggestion in mapping_suggestions.items():
if suggestion is None or len(suggestion['field_path']) > 1:
headers_to_keep.append(header)
continue
field_name = suggestion['field_path'][0]
field_distance = suggestion['distance']
best_distance, _best_header = min_dist_per_field.get(field_name, (1, None))
if field_distance < best_distance:
min_dist_per_field[field_name] = (field_distance, header)
headers_to_keep = headers_to_keep + [value[1] for value in min_dist_per_field.values()]
for header in mapping_suggestions.keys() - headers_to_keep:
del mapping_suggestions[header]
def parse_preview(self, options, count=10):
""" Generates a preview of the uploaded files, and performs
fields-matching between the import's file data and the model's
columns.
If the headers are not requested (not options.has_headers),
returned ``matches`` and ``headers`` are both ``False``.
:param int count: number of preview lines to generate
:param options: format-specific options.
CSV: {quoting, separator, headers}
:type options: {str, str, str, bool}
:returns: ``{fields, matches, headers, preview} | {error, preview}``
:rtype: {dict(str: dict(...)), dict(int, list(str)), list(str), list(list(str))} | {str, str}
"""
self.ensure_one()
fields_tree = self.get_fields_tree(self.res_model)
try:
file_length, rows = self._read_file(options)
if file_length <= 0:
raise ImportValidationError(_("Import file has no content or is corrupt"))
preview = rows[:count]
# Get file headers
if options.get('has_headers') and preview:
# We need the header types before matching columns to fields
headers = preview.pop(0)
header_types = self._extract_headers_types(headers, preview, options)
else:
header_types, headers = {}, []
# Get matches: the ones already selected by the user or propose a new matching.
matches = {}
# If user checked to the advanced mode, we re-parse the file but we keep the mapping "as is".
# No need to make another mapping proposal
if options.get('keep_matches') and options.get('fields'):
for index, match in enumerate(options.get('fields', [])):
if match:
matches[index] = match.split('/')
elif options.get('has_headers'):
matches = self._get_mapping_suggestions(headers, header_types, fields_tree)
# remove header_name for matches keys as tuples are no supported in json.
# and remove distance from suggestion (keep only the field path) as not used at client side.
matches = {
header_key[0]: suggestion['field_path']
for header_key, suggestion in matches.items()
if suggestion
}
# compute if we should activate advanced mode or not:
# if was already activated of if file contains "relational fields".
if options.get('keep_matches'):
advanced_mode = options.get('advanced')
else:
# Check is label contain relational field
has_relational_header = any(len(models.fix_import_export_id_paths(col)) > 1 for col in headers)
# Check is matches fields have relational field
has_relational_match = any(len(match) > 1 for field, match in matches.items() if match)
advanced_mode = has_relational_header or has_relational_match
# Take first non null values for each column to show preview to users.
# Initially first non null value is displayed to the user.
# On hover preview consists in 5 values.
column_example = []
for column_index, _unused in enumerate(preview[0]):
vals = []
for record in preview:
if record[column_index]:
vals.append("%s%s" % (record[column_index][:50], "..." if len(record[column_index]) > 50 else ""))
if len(vals) == 5:
break
column_example.append(
vals or
[""] # blank value if no example have been found at all for the current column
)
# Batch management
batch = False
batch_cutoff = options.get('limit')
if batch_cutoff:
if count > batch_cutoff:
batch = len(preview) > batch_cutoff
else:
batch = bool(next(
itertools.islice(rows, batch_cutoff - count, None),
None
))
return {
'fields': fields_tree,
'matches': matches or False,
'headers': headers or False,
'header_types': list(header_types.values()) or False,
'preview': column_example,
'options': options,
'advanced_mode': advanced_mode,
'debug': self.env.user.has_group('base.group_no_one'),
'batch': batch,
'file_length': len(rows),
}
except Exception as error:
# Due to lazy generators, UnicodeDecodeError (for
# instance) may only be raised when serializing the
# preview to a list in the return.
_logger.debug("Error during parsing preview", exc_info=True)
preview = None
if self.file_type == 'text/csv' and self.file:
preview = self.file[:ERROR_PREVIEW_BYTES].decode('iso-8859-1')
return {
'error': str(error),
# iso-8859-1 ensures decoding will always succeed,
# even if it yields non-printable characters. This is
# in case of UnicodeDecodeError (or csv.Error
# compounded with UnicodeDecodeError)
'preview': preview,
}
@api.model
def _convert_import_data(self, fields, options):
""" Extracts the input BaseModel and fields list (with
``False``-y placeholders for fields to *not* import) into a
format Model.import_data can use: a fields list without holes
and the precisely matching data matrix
:param list(str|bool): fields
:returns: (data, fields)
:rtype: (list(list(str)), list(str))
:raises ValueError: in case the import data could not be converted
"""
# Get indices for non-empty fields
indices = [index for index, field in enumerate(fields) if field]
if not indices:
raise ImportValidationError(_("You must configure at least one field to import"))
# If only one index, itemgetter will return an atom rather
# than a 1-tuple
if len(indices) == 1:
mapper = lambda row: [row[indices[0]]]
else:
mapper = operator.itemgetter(*indices)
# Get only list of actually imported fields
import_fields = [f for f in fields if f]
_file_length, rows_to_import = self._read_file(options)
if len(rows_to_import[0]) != len(fields):
raise ImportValidationError(
_(
"Error while importing records: all rows should be of the same size, but the title row has %(title_row_entries)d entries while the first row has %(first_row_entries)d. You may need to change the separator character.",
title_row_entries=len(fields),
first_row_entries=len(rows_to_import[0]),
),
)
if options.get('has_headers'):
rows_to_import = rows_to_import[1:]
data = [
list(row) for row in map(mapper, rows_to_import)
# don't try inserting completely empty rows (e.g. from
# filtering out o2m fields)
if any(row)
]
# slicing needs to happen after filtering out empty rows as the
# data offsets from load are post-filtering
return data[options.get('skip'):], import_fields
@api.model
def _remove_currency_symbol(self, value):
value = value.strip()
negative = False
# Careful that some countries use () for negative so replace it by - sign
if value.startswith('(') and value.endswith(')'):
value = value[1:-1]
negative = True
float_regex = re.compile(r'([+-]?[0-9.,]+)')
split_value = [g for g in float_regex.split(value) if g]
if len(split_value) > 2:
# This is probably not a float
return False
if len(split_value) == 1:
if float_regex.search(split_value[0]) is not None:
return split_value[0] if not negative else '-' + split_value[0]
return False
else:
# String has been split in 2, locate which index contains the float and which does not
currency_index = 0
if float_regex.search(split_value[0]) is not None:
currency_index = 1
# Check that currency exists
currency = self.env['res.currency'].search([('symbol', '=', split_value[currency_index].strip())])
if len(currency):
return split_value[(currency_index + 1) % 2] if not negative else '-' + split_value[(currency_index + 1) % 2]
# Otherwise it is not a float with a currency symbol
return False
@api.model
def _parse_float_from_data(self, data, index, name, options):
for line in data:
line[index] = line[index].strip()
if not line[index]:
continue
thousand_separator, decimal_separator = self._infer_separators(line[index], options)
if 'E' in line[index] or 'e' in line[index]:
tmp_value = line[index].replace(thousand_separator, '.')
try:
tmp_value = '{:f}'.format(float(tmp_value))
line[index] = tmp_value
thousand_separator = ' '
except Exception:
pass
line[index] = line[index].replace(thousand_separator, '').replace(decimal_separator, '.')
old_value = line[index]
line[index] = self._remove_currency_symbol(line[index])
if line[index] is False:
raise ImportValidationError(_("Column %(column)s contains incorrect values (value: %(value)s)", column=name, value=old_value), field=name)
def _infer_separators(self, value, options):
""" Try to infer the shape of the separators: if there are two
different "non-numberic" characters in the number, the
former/duplicated one would be grouping ("thousands" separator) and
the latter would be the decimal separator. The decimal separator
should furthermore be unique.
"""
# can't use \p{Sc} using re so handroll it
non_number = [
# any character
c for c in value
# which is not a numeric decoration (() is used for negative
# by accountants)
if c not in '()-+'
# which is not a digit or a currency symbol
if unicodedata.category(c) not in ('Nd', 'Sc')
]
counts = collections.Counter(non_number)
# if we have two non-numbers *and* the last one has a count of 1,
# we probably have grouping & decimal separators
if len(counts) == 2 and counts[non_number[-1]] == 1:
return [character for character, _count in counts.most_common()]
# otherwise get whatever's in the options, or fallback to a default
thousand_separator = options.get('float_thousand_separator', ' ')
decimal_separator = options.get('float_decimal_separator', '.')
return thousand_separator, decimal_separator
def _parse_import_data(self, data, import_fields, options):
""" Lauch first call to :meth:`_parse_import_data_recursive` with an
empty prefix. :meth:`_parse_import_data_recursive` will be run
recursively for each relational field.
"""
return self._parse_import_data_recursive(self.res_model, '', data, import_fields, options)
def _parse_import_data_recursive(self, model, prefix, data, import_fields, options):
# Get fields of type date/datetime
all_fields = self.env[model].fields_get()
for name, field in all_fields.items():
name = prefix + name
if field['type'] in ('date', 'datetime') and name in import_fields:
index = import_fields.index(name)
self._parse_date_from_data(data, index, name, field['type'], options)
# Check if the field is in import_field and is a relational (followed by /)
# Also verify that the field name exactly match the import_field at the correct level.
elif any(name + '/' in import_field and name == import_field.split('/')[prefix.count('/')] for import_field in import_fields):
# Recursive call with the relational as new model and add the field name to the prefix
self._parse_import_data_recursive(field['relation'], name + '/', data, import_fields, options)
elif field['type'] in ('float', 'monetary') and name in import_fields:
# Parse float, sometimes float values from file have currency symbol or () to denote a negative value
# We should be able to manage both case
index = import_fields.index(name)
self._parse_float_from_data(data, index, name, options)
elif field['type'] == 'binary' and field.get('attachment') and any(f in name for f in IMAGE_FIELDS) and name in import_fields:
index = import_fields.index(name)
with requests.Session() as session:
session.stream = True
for num, line in enumerate(data):
if re.match(config.get("import_image_regex", DEFAULT_IMAGE_REGEX), line[index]):
if not self.env.user._can_import_remote_urls():
raise ImportValidationError(
_("You can not import images via URL, check with your administrator or support for the reason."),
field=name, field_type=field['type']
)
line[index] = self._import_image_by_url(line[index], session, name, num)
elif '.' in line[index]:
# Detect if it's a filename
pass
else:
try:
base64.b64decode(line[index], validate=True)
except ValueError:
raise ImportValidationError(
_("Found invalid image data, images should be imported as either URLs or base64-encoded data."),
field=name, field_type=field['type']
)
return data
def _parse_date_from_data(self, data, index, name, field_type, options):
dt = datetime.datetime
fmt = fields.Date.to_string if field_type == 'date' else fields.Datetime.to_string
d_fmt = options.get('date_format') or DEFAULT_SERVER_DATE_FORMAT
dt_fmt = options.get('datetime_format') or DEFAULT_SERVER_DATETIME_FORMAT
for num, line in enumerate(data):
if not line[index]:
continue
v = line[index].strip()
try:
# first try parsing as a datetime if it's one
if dt_fmt and field_type == 'datetime':
try:
line[index] = fmt(dt.strptime(v, dt_fmt))
continue
except ValueError:
pass
# otherwise try parsing as a date whether it's a date
# or datetime
line[index] = fmt(dt.strptime(v, d_fmt))
except ValueError as e:
raise ImportValidationError(
_("Column %(column)s contains incorrect values. Error in line %(line)d: %(error)s", column=name, line=num + 1, error=e),
field=name, field_type=field_type
)
except Exception as e:
raise ImportValidationError(
_("Error Parsing Date [%(field)s:L%(line)d]: %(error)s", field=name, line=num + 1, error=e),
field=name, field_type=field_type
)
def _import_image_by_url(self, url, session, field, line_number):
""" Imports an image by URL
:param str url: the original field value
:param requests.Session session:
:param str field: name of the field (for logging/debugging)
:param int line_number: 0-indexed line number within the imported file (for logging/debugging)
:return: the replacement value
:rtype: bytes
"""
maxsize = int(config.get("import_image_maxbytes", DEFAULT_IMAGE_MAXBYTES))
_logger.debug("Trying to import image from URL: %s into field %s, at line %s" % (url, field, line_number))
try:
response = session.get(url, timeout=int(config.get("import_image_timeout", DEFAULT_IMAGE_TIMEOUT)))
response.raise_for_status()
if response.headers.get('Content-Length') and int(response.headers['Content-Length']) > maxsize:
raise ImportValidationError(
_("File size exceeds configured maximum (%s bytes)", maxsize),
field=field
)
content = bytearray()
for chunk in response.iter_content(DEFAULT_IMAGE_CHUNK_SIZE):
content += chunk
if len(content) > maxsize:
raise ImportValidationError(
_("File size exceeds configured maximum (%s bytes)", maxsize),
field=field
)
image = Image.open(io.BytesIO(content))
w, h = image.size
if w * h > 42e6: # Nokia Lumia 1020 photo resolution
raise ImportValidationError(
_("Image size excessive, imported images must be smaller than 42 million pixel"),
field=field
)
return base64.b64encode(content)
except Exception as e:
_logger.warning(e, exc_info=True)
raise ImportValidationError(_("Could not retrieve URL: %(url)s [%(field_name)s: L%(line_number)d]: %(error)s") % {
'url': url,
'field_name': field,
'line_number': line_number + 1,
'error': e
})
def execute_import(self, fields, columns, options, dryrun=False):
""" Actual execution of the import
:param fields: import mapping: maps each column to a field,
``False`` for the columns to ignore
:type fields: list(str|bool)
:param columns: columns label
:type columns: list(str|bool)
:param dict options:
:param bool dryrun: performs all import operations (and
validations) but rollbacks writes, allows
getting as much errors as possible without
the risk of clobbering the database.
:returns: A list of errors. If the list is empty the import
executed fully and correctly. If the list is
non-empty it contains dicts with 3 keys:
``type``
the type of error (``error|warning``)
``message``
the error message associated with the error (a string)
``record``
the data which failed to import (or ``false`` if that data
isn't available or provided)
:rtype: dict(ids: list(int), messages: list({type, message, record}))
"""
self.ensure_one()
self._cr.execute('SAVEPOINT import')
try:
input_file_data, import_fields = self._convert_import_data(fields, options)
# Parse date and float field
input_file_data = self._parse_import_data(input_file_data, import_fields, options)
except ImportValidationError as error:
return {'messages': [error.__dict__]}
_logger.info('importing %d rows...', len(input_file_data))
binary_filenames = self._extract_binary_filenames(import_fields, input_file_data)
import_fields, merged_data = self._handle_multi_mapping(import_fields, input_file_data)
if options.get('fallback_values'):
merged_data = self._handle_fallback_values(import_fields, merged_data, options['fallback_values'])
name_create_enabled_fields = options.pop('name_create_enabled_fields', {})
import_limit = options.pop('limit', None)
model = self.env[self.res_model].with_context(
import_file=True,
name_create_enabled_fields=name_create_enabled_fields,
import_set_empty_fields=options.get('import_set_empty_fields', []),
import_skip_records=options.get('import_skip_records', []),
_import_limit=import_limit)
import_result = model.load(import_fields, merged_data)
_logger.info('done')
# If transaction aborted, RELEASE SAVEPOINT is going to raise
# an InternalError (ROLLBACK should work, maybe). Ignore that.
# TODO: to handle multiple errors, create savepoint around
# write and release it in case of write error (after
# adding error to errors array) => can keep on trying to
# import stuff, and rollback at the end if there is any
# error in the results.
try:
if dryrun:
self._cr.execute('ROLLBACK TO SAVEPOINT import')
# cancel all changes done to the registry/ormcache
# we need to clear the cache in case any created id was added to an ormcache and would be missing afterward
self.pool.clear_all_caches()
# don't propagate to other workers since it was rollbacked
self.pool.reset_changes()
else:
self._cr.execute('RELEASE SAVEPOINT import')
except psycopg2.InternalError:
pass
# Insert/Update mapping columns when import complete successfully
if import_result['ids'] and options.get('has_headers'):
BaseImportMapping = self.env['base_import.mapping']
for index, column_name in enumerate(columns):
if column_name:
# Update to latest selected field
mapping_domain = [('res_model', '=', self.res_model), ('column_name', '=', column_name)]
column_mapping = BaseImportMapping.search(mapping_domain, limit=1)
if column_mapping:
if column_mapping.field_name != fields[index]:
column_mapping.field_name = fields[index]
else:
BaseImportMapping.create({
'res_model': self.res_model,
'column_name': column_name,
'field_name': fields[index]
})
if 'name' in import_fields:
index_of_name = import_fields.index('name')
skipped = options.get('skip', 0)
# pad front as data doesn't contain anythig for skipped lines
r = import_result['name'] = [''] * skipped
# only add names for the window being imported
r.extend(x[index_of_name] for x in input_file_data[:import_limit])
# pad back (though that's probably not useful)
r.extend([''] * (len(input_file_data) - (import_limit or 0)))
else:
import_result['name'] = []
skip = options.get('skip', 0)
# convert load's internal nextrow to the imported file's
if import_result['nextrow']: # don't update if nextrow = 0 (= no nextrow)
import_result['nextrow'] += skip
if binary_filenames:
import_result['binary_filenames'] = binary_filenames
return import_result
def _extract_binary_filenames(self, import_fields, data, model=False, prefix='', binary_filenames=False):
model = model or self.res_model
binary_filenames = binary_filenames or defaultdict(list)
for name, field in self.env[model]._fields.items():
name = prefix + name
if any(name + '/' in import_field and name == import_field.split('/')[prefix.count('/')] for import_field in import_fields):
# Recursive call with the relational as new model and add the field name to the prefix
binary_filenames = self._extract_binary_filenames(import_fields, data, field.comodel_name, name + '/', binary_filenames)
elif field.type == 'binary' and field.attachment and any(f in name for f in IMAGE_FIELDS) and name in import_fields:
index = import_fields.index(name)
for line in data:
filename = None
value = line[index]
if isinstance(value, str):
if re.match(config.get("import_image_regex", DEFAULT_IMAGE_REGEX), value):
pass
elif '.' in value:
# Detect if it's a filename
filename = value
line[index] = ''
# else base64 nothing to do
binary_filenames[name].append(filename)
return binary_filenames
def _handle_multi_mapping(self, import_fields, input_file_data):
""" This method handles multiple mapping on the same field.
It will return the list of the mapped fields and the concatenated data for each field:
- If two column are mapped on the same text or char field, they will end up
in only one column, concatenated via space (char) or new line (text).
- The same logic is used for many2many fields. Multiple values can be
imported if they are separated by ``,``.
Input/output Example:
input data
.. code-block:: python
[
["Value part 1", "1", "res.partner_id1", "Value part 2"],
["I am", "1", "res.partner_id1", "Batman"],
]
import_fields
``[desc, some_number, partner, desc]``
output merged_data
.. code-block:: python
[
["Value part 1 Value part 2", "1", "res.partner_id1"],
["I am Batman", "1", "res.partner_id1"],
]
fields
``[desc, some_number, partner]``
"""
# Get fields and their occurrences indexes
# Among the fields that have been mapped, we get their corresponding mapped column indexes
# as multiple fields could have been mapped to multiple columns.
mapped_field_indexes = {}
for idx, field in enumerate(field for field in import_fields if field):
mapped_field_indexes.setdefault(field, list()).append(idx)
import_fields = list(mapped_field_indexes.keys())
# recreate data and merge duplicates (applies only on text or char fields)
# Also handles multi-mapping on "field of relation fields".
merged_data = []
for record in input_file_data:
new_record = []
for fields, indexes in mapped_field_indexes.items():
split_fields = fields.split('/')
target_field = split_fields[-1]
# get target_field type (on target model)
target_model = self.res_model
for field in split_fields:
# if not on the last hierarchy level, retarget the model.
# Also check if the field exists to silently ignore properties field and
# since we don't have the definition here anyway.
if field != target_field and field in self.env[target_model]:
target_model = self.env[target_model][field]._name
field = self.env[target_model]._fields.get(target_field)
field_type = field.type if field else ''
# merge data if necessary
if field_type == 'char':
new_record.append(' '.join(record[idx] for idx in indexes if record[idx]))
elif field_type == 'text':
new_record.append('\n'.join(record[idx] for idx in indexes if record[idx]))
elif field_type == 'many2many':
new_record.append(','.join(record[idx] for idx in indexes if record[idx]))
else:
new_record.append(record[indexes[0]])
merged_data.append(new_record)
return import_fields, merged_data
def _handle_fallback_values(self, import_field, input_file_data, fallback_values):
"""
If there are fallback values, this method will replace the input file
data value if it does not match the possible values for the given field.
This is only valid for boolean and selection fields.
.. note::
We can consider that we need to retrieve the selection values for
all the fields in fallback_values, as if they are present, it's because
there was already a conflict during first import run and user had to
select a fallback value for the field.
:param: list import_field: ordered list of field that have been matched to import data
:param: list input_file_data: ordered list of values (list) that need to be imported in the given import_fields
:param: dict fallback_values:
contains all the fields that have been tagged by the user to use a
specific fallback value in case the value to import does not match
values accepted by the field (selection or boolean) e.g.::
{
'fieldName': {
'fallback_value': fallback_value,
'field_model': field_model,
'field_type': field_type
},
'state': {
'fallback_value': 'draft',
'field_model': field_model,
'field_type': 'selection'
},
'active': {
'fallback_value': 'true',
'field_model': field_model,
'field_type': 'boolean'
}
}
"""
# add possible selection values into our fallback dictionary for fields of type "selection"
for field_string in fallback_values:
if fallback_values[field_string]['field_type'] != "selection":
continue
field_path = field_string.split('/')
target_field = field_path[-1]
target_model = self.env[fallback_values[field_string]['field_model']]
selection_values = [value.lower() for (key, value) in target_model.fields_get([target_field])[target_field]['selection']]
fallback_values[field_string]['selection_values'] = selection_values
# check fallback values
for record_index, records in enumerate(input_file_data):
for column_index, value in enumerate(records):
field = import_field[column_index]
if field in fallback_values:
fallback_value = fallback_values[field]['fallback_value']
# Boolean
if fallback_values[field]['field_type'] == "boolean":
value = value if value.lower() in ('0', '1', 'true', 'false') else fallback_value
# Selection
elif fallback_values[field]['field_type'] == "selection" and value.lower() not in fallback_values[field]["selection_values"]:
value = fallback_value if fallback_value != 'skip' else None # don't set any value if we skip
input_file_data[record_index][column_index] = value
return input_file_data
_SEPARATORS = [' ', '/', '-', '.', '']
_PATTERN_BASELINE = [
('%m', '%d', '%Y'),
('%d', '%m', '%Y'),
('%Y', '%m', '%d'),
('%Y', '%d', '%m'),
]
DATE_FORMATS = []
# take the baseline format and duplicate performing the following
# substitution: long year -> short year, numerical month -> short
# month, numerical month -> long month. Each substitution builds on
# the previous two
for ps in _PATTERN_BASELINE:
patterns = {ps}
for s, t in [('%Y', '%y')]:
patterns.update([ # need listcomp: with genexpr "set changed size during iteration"
tuple(t if it == s else it for it in f)
for f in patterns
])
DATE_FORMATS.extend(patterns)
DATE_PATTERNS = [
sep.join(fmt)
for sep in _SEPARATORS
for fmt in DATE_FORMATS
]
TIME_PATTERNS = [
'%H:%M:%S', '%H:%M', '%H', # 24h
'%I:%M:%S %p', '%I:%M %p', '%I %p', # 12h
]
def check_patterns(patterns, values):
for pattern in patterns:
p = to_re(pattern)
for val in values:
if val and not p.match(val):
break
else: # no break, all match
return pattern
return None
def to_re(pattern):
""" cut down version of TimeRE converting strptime patterns to regex
"""
pattern = re.sub(r'\s+', r'\\s+', pattern)
pattern = re.sub('%([a-z])', _replacer, pattern, flags=re.IGNORECASE)
pattern = '^' + pattern + '$'
return re.compile(pattern, re.IGNORECASE)
def _replacer(m):
return _P_TO_RE[m.group(1)]
_P_TO_RE = {
'd': r"(3[0-1]|[1-2]\d|0[1-9]|[1-9]| [1-9])",
'H': r"(2[0-3]|[0-1]\d|\d)",
'I': r"(1[0-2]|0[1-9]|[1-9])",
'm': r"(1[0-2]|0[1-9]|[1-9])",
'M': r"([0-5]\d|\d)",
'S': r"(6[0-1]|[0-5]\d|\d)",
'y': r"(\d\d)",
'Y': r"(\d\d\d\d)",
'p': r"(am|pm)",
'%': '%',
}
def read_file_failed(exc: Exception, message: str) -> UserError:
_logger.warning(message, exc_info=True)
e = UserError(message)
e.__cause__ = exc
return e
|