1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import base64
import json
from markupsafe import Markup
from psycopg2 import IntegrityError
import re
from werkzeug.exceptions import BadRequest
from odoo import http, SUPERUSER_ID
from odoo.addons.base.models.ir_qweb_fields import nl2br, nl2br_enclose
from odoo.http import request
from odoo.tools import plaintext2html
from odoo.exceptions import AccessDenied, ValidationError, UserError
from odoo.tools.misc import hmac, consteq
from odoo.tools.translate import _, LazyTranslate
_lt = LazyTranslate(__name__)
class WebsiteForm(http.Controller):
@http.route('/website/form', type='http', auth="public", methods=['POST'], multilang=False)
def website_form_empty(self, **kwargs):
# This is a workaround to don't add language prefix to <form action="/website/form/" ...>
return ""
# Check and insert values from the form on the model <model>
@http.route('/website/form/<string:model_name>', type='http', auth="public", methods=['POST'], website=True, csrf=False)
def website_form(self, model_name, **kwargs):
# Partial CSRF check, only performed when session is authenticated, as there
# is no real risk for unauthenticated sessions here. It's a common case for
# embedded forms now: SameSite policy rejects the cookies, so the session
# is lost, and the CSRF check fails, breaking the post for no good reason.
csrf_token = request.params.pop('csrf_token', None)
if request.session.uid and not request.validate_csrf(csrf_token):
raise BadRequest('Session expired (invalid CSRF token)')
try:
# The except clause below should not let what has been done inside
# here be committed. It should not either roll back everything in
# this controller method. Instead, we use a savepoint to roll back
# what has been done inside the try clause.
with request.env.cr.savepoint():
if request.env['ir.http']._verify_request_recaptcha_token('website_form'):
# request.params was modified, update kwargs to reflect the changes
kwargs = dict(request.params)
kwargs.pop('model_name')
return self._handle_website_form(model_name, **kwargs)
error = _("Suspicious activity detected by Google reCaptcha.")
except (ValidationError, UserError) as e:
error = e.args[0]
return json.dumps({
'error': error,
})
def _handle_website_form(self, model_name, **kwargs):
model_record = request.env['ir.model'].sudo().search([('model', '=', model_name), ('website_form_access', '=', True)])
if not model_record:
return json.dumps({
'error': _("The form's specified model does not exist")
})
try:
data = self.extract_data(model_record, kwargs)
# If we encounter an issue while extracting data
except ValidationError as e:
# I couldn't find a cleaner way to pass data to an exception
return json.dumps({'error_fields': e.args[0]})
try:
id_record = self.insert_record(request, model_record, data['record'], data['custom'], data.get('meta'))
if id_record:
self.insert_attachment(model_record, id_record, data['attachments'])
# in case of an email, we want to send it immediately instead of waiting
# for the email queue to process
if model_name == 'mail.mail':
form_has_email_cc = {'email_cc', 'email_bcc'} & kwargs.keys() or \
'email_cc' in kwargs["website_form_signature"]
# remove the email_cc information from the signature
kwargs["website_form_signature"] = kwargs["website_form_signature"].split(':')[0]
if kwargs.get("email_to"):
value = kwargs['email_to'] + (':email_cc' if form_has_email_cc else '')
hash_value = hmac(model_record.env, 'website_form_signature', value)
if not consteq(kwargs["website_form_signature"], hash_value):
raise AccessDenied('invalid website_form_signature')
request.env[model_name].sudo().browse(id_record).send()
# Some fields have additional SQL constraints that we can't check generically
# Ex: crm.lead.probability which is a float between 0 and 1
# TODO: How to get the name of the erroneous field ?
except IntegrityError:
return json.dumps(False)
request.session['form_builder_model_model'] = model_record.model
request.session['form_builder_model'] = model_record.name
request.session['form_builder_id'] = id_record
return json.dumps({'id': id_record})
# Constants string to make metadata readable on a text field
_meta_label = _lt("Metadata") # Title for meta data
# Dict of dynamically called filters following type of field to be fault tolerent
def identity(self, field_label, field_input):
return field_input
def integer(self, field_label, field_input):
return int(field_input)
def floating(self, field_label, field_input):
return float(field_input)
def html(self, field_label, field_input):
return plaintext2html(field_input)
def boolean(self, field_label, field_input):
return bool(field_input)
def binary(self, field_label, field_input):
return base64.b64encode(field_input.read())
def one2many(self, field_label, field_input):
return [int(i) for i in field_input.split(',')]
def many2many(self, field_label, field_input, *args):
return [(args[0] if args else (6, 0)) + (self.one2many(field_label, field_input),)]
def tags(self, field_label, field_input):
# Unescape ',' and '\'
return [
tag.replace('\\,', ',').replace('\\/', '\\')
for tag in re.split(r'(?<!\\),', field_input)
]
_input_filters = {
'char': identity,
'text': identity,
'html': html,
'date': identity,
'datetime': identity,
'many2one': integer,
'one2many': one2many,
'many2many': many2many,
'selection': identity,
'boolean': boolean,
'integer': integer,
'float': floating,
'binary': binary,
'monetary': floating,
# Properties
'tags': tags,
}
# Extract all data sent by the form and sort its on several properties
def extract_data(self, model, values):
dest_model = request.env[model.sudo().model]
data = {
'record': {}, # Values to create record
'attachments': [], # Attached files
'custom': '', # Custom fields values
'meta': '', # Add metadata if enabled
}
authorized_fields = model.with_user(SUPERUSER_ID)._get_form_writable_fields(values)
error_fields = []
custom_fields = []
for field_name, field_value in values.items():
# First decode the field_name encoded at the client side.
field_name = re.sub('"', '"', field_name)
# If the value of the field if a file
if hasattr(field_value, 'filename'):
# Undo file upload field name indexing
field_name = field_name.split('[', 1)[0]
# If it's an actual binary field, convert the input file
# If it's not, we'll use attachments instead
if field_name in authorized_fields and authorized_fields[field_name]['type'] == 'binary':
data['record'][field_name] = base64.b64encode(field_value.read())
field_value.stream.seek(0) # do not consume value forever
if authorized_fields[field_name]['manual'] and field_name + "_filename" in dest_model:
data['record'][field_name + "_filename"] = field_value.filename
else:
field_value.field_name = field_name
data['attachments'].append(field_value)
# If it's a known field
elif field_name in authorized_fields:
try:
if '_property' in authorized_fields[field_name]:
# Collect all properties for a given property field in
# a list.
field_data = authorized_fields[field_name]
properties_field_name = field_data['_property']['field']
del field_data['_property']
properties = data['record'].setdefault(properties_field_name, [])
property_type = authorized_fields[field_name]['type']
# For properties, many2many is stored as an array of
# integers like one2many
filter_type = 'one2many' if property_type == 'many2many' else property_type
input_filter = self._input_filters[filter_type]
field_data['value'] = input_filter(self, field_name, field_value)
properties.append(field_data)
else:
input_filter = self._input_filters[authorized_fields[field_name]['type']]
data['record'][field_name] = input_filter(self, field_name, field_value)
except ValueError:
error_fields.append(field_name)
if dest_model._name == 'mail.mail' and field_name == 'email_from':
# As the "email_from" is used to populate the email_from of the
# sent mail.mail, it could be filtered out at sending time if no
# outgoing mail server "from_filter" match the sender email.
# To make sure the email contains that (important) information
# we also add it to the "custom message" that will be included
# in the body of the email sent.
custom_fields.append((_('email'), field_value))
# If it's a custom field
elif field_name not in ('context', 'website_form_signature'):
custom_fields.append((field_name, field_value))
data['custom'] = "\n".join([u"%s : %s" % v for v in custom_fields])
# Add metadata if enabled # ICP for retrocompatibility
if request.env['ir.config_parameter'].sudo().get_param('website_form_enable_metadata'):
environ = request.httprequest.headers.environ
data['meta'] += "%s : %s\n%s : %s\n%s : %s\n%s : %s\n" % (
"IP", environ.get("REMOTE_ADDR"),
"USER_AGENT", environ.get("HTTP_USER_AGENT"),
"ACCEPT_LANGUAGE", environ.get("HTTP_ACCEPT_LANGUAGE"),
"REFERER", environ.get("HTTP_REFERER")
)
# This function can be defined on any model to provide
# a model-specific filtering of the record values
# Example:
# def website_form_input_filter(self, values):
# values['name'] = '%s\'s Application' % values['partner_name']
# return values
if hasattr(dest_model, "website_form_input_filter"):
data['record'] = dest_model.website_form_input_filter(request, data['record'])
missing_required_fields = [label for label, field in authorized_fields.items() if field['required'] and label not in data['record']]
if any(error_fields):
raise ValidationError(error_fields + missing_required_fields)
return data
def _should_log_authenticate_message(self, record):
return True
def insert_record(self, request, model, values, custom, meta=None):
model_name = model.sudo().model
if model_name == 'mail.mail':
email_from = _('"%(company)s form submission" <%(email)s>', company=request.env.company.name, email=request.env.company.email)
values.update({'reply_to': values.get('email_from'), 'email_from': email_from})
record = request.env[model_name].with_user(SUPERUSER_ID).with_context(
mail_create_nosubscribe=True,
).create(values)
authenticate_message = False
email_field_name = request.env[model_name]._mail_get_primary_email_field()
if email_field_name and hasattr(record, '_message_log') and email_field_name in values:
warning_icon = ""
if request.session.uid:
user_email = request.env.user.email
form_email = values[email_field_name]
if user_email != form_email:
authenticate_message = _("This %(model_name)s was submitted by %(user_name)s (%(user_email)s) on behalf of %(form_email)s",
model_name=model.name, user_name=request.env.user.name, user_email=user_email, form_email=form_email)
elif self._should_log_authenticate_message(record):
warning_icon = "/!\\ "
authenticate_message = _("EXTERNAL SUBMISSION - Customer not verified")
if authenticate_message:
record._message_log(
body=Markup('<div class="alert alert-info" role="alert">{warning_icon}{message}</div>').format(warning_icon=warning_icon, message=authenticate_message),
)
if custom or meta or authenticate_message:
_custom_label = "%s\n___________\n\n" % _("Other Information:") # Title for custom fields
if model_name == 'mail.mail':
_custom_label = "%s\n___________\n\n" % _("This message has been posted on your website!")
default_field = model.website_form_default_field_id
default_field_data = values.get(default_field.name, '')
custom_label = _custom_label + custom if custom else ''
meta_label = self._meta_label + "\n________\n\n" + meta if meta else ''
custom_content = ''
for text in [authenticate_message, default_field_data, custom_label, meta_label]:
if not text:
continue
if custom_content:
custom_content += '\n\n'
custom_content += text
# If there is a default field configured for this model, use it.
# If there isn't, put the custom data in a message instead
if default_field.name:
if default_field.ttype == 'html' or model_name == 'mail.mail':
custom_content = nl2br_enclose(custom_content)
record.update({default_field.name: custom_content})
elif hasattr(record, '_message_log'):
record._message_log(
body=nl2br_enclose(custom_content, 'p'),
message_type='comment',
)
return record.id
# Link all files attached on the form
def insert_attachment(self, model, id_record, files):
orphan_attachment_ids = []
model_name = model.sudo().model
record = model.env[model_name].browse(id_record)
authorized_fields = model.with_user(SUPERUSER_ID)._get_form_writable_fields()
for file in files:
custom_field = file.field_name not in authorized_fields
attachment_value = {
'name': file.filename,
'datas': base64.encodebytes(file.read()),
'res_model': model_name,
'res_id': record.id,
}
attachment_id = request.env['ir.attachment'].sudo().create(attachment_value)
if attachment_id and not custom_field:
record_sudo = record.sudo()
value = [(4, attachment_id.id)]
if record_sudo._fields[file.field_name].type == 'many2one':
value = attachment_id.id
record_sudo[file.field_name] = value
else:
orphan_attachment_ids.append(attachment_id.id)
if model_name != 'mail.mail' and hasattr(record, '_message_log') and orphan_attachment_ids:
# If some attachments didn't match a field on the model,
# we create a mail.message to link them to the record
record._message_log(
attachment_ids=[(6, 0, orphan_attachment_ids)],
body=Markup(_('<p>Attached files: </p>')),
message_type='comment',
)
elif model_name == 'mail.mail' and orphan_attachment_ids:
# If the model is mail.mail then we have no other choice but to
# attach the custom binary field files on the attachment_ids field.
for attachment_id_id in orphan_attachment_ids:
record.attachment_ids = [(4, attachment_id_id)]
|