1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
|
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import logging
import random
import string
from werkzeug import urls
from odoo import tools, models, fields, api, _
from odoo.addons.mail.tools import link_preview
from odoo.exceptions import UserError
from odoo.osv import expression
from odoo.tools.mail import validate_url
LINK_TRACKER_UNIQUE_FIELDS = ('url', 'campaign_id', 'medium_id', 'source_id', 'label')
_logger = logging.getLogger(__name__)
LINK_TRACKER_MIN_CODE_LENGTH = 3
class LinkTracker(models.Model):
""" Link trackers allow users to wrap any URL into a short URL that can be
tracked by Odoo. Clicks are counter on each link. A tracker is linked to
UTMs allowing to analyze marketing actions.
This model is also used in mass_mailing where each link in html body is
automatically converted into a short link that is tracked and integrates
UTMs. """
_name = "link.tracker"
_rec_name = "short_url"
_description = "Link Tracker"
_order = "count DESC"
_inherit = ["utm.mixin"]
# URL info
url = fields.Char(string='Target URL', required=True)
absolute_url = fields.Char("Absolute URL", compute="_compute_absolute_url")
short_url = fields.Char(string='Tracked URL', compute='_compute_short_url')
redirected_url = fields.Char(string='Redirected URL', compute='_compute_redirected_url')
short_url_host = fields.Char(string='Host of the short URL', compute='_compute_short_url_host')
title = fields.Char(string='Page Title', store=True)
label = fields.Char(string='Button label')
# Tracking
link_code_ids = fields.One2many('link.tracker.code', 'link_id', string='Codes')
code = fields.Char(string='Short URL code', compute='_compute_code', inverse="_inverse_code", readonly=False)
link_click_ids = fields.One2many('link.tracker.click', 'link_id', string='Clicks')
count = fields.Integer(string='Number of Clicks', compute='_compute_count', store=True)
# UTMs - enforcing the fact that we want to 'set null' when relation is unlinked
campaign_id = fields.Many2one(ondelete='set null')
medium_id = fields.Many2one(ondelete='set null')
source_id = fields.Many2one(ondelete='set null')
@api.depends("url")
def _compute_absolute_url(self):
for tracker in self:
url = urls.url_parse(tracker.url)
if url.scheme:
tracker.absolute_url = tracker.url
else:
tracker.absolute_url = urls.url_join(tracker.get_base_url(), url)
@api.depends('link_click_ids.link_id')
def _compute_count(self):
clicks_data = self.env['link.tracker.click']._read_group(
[('link_id', 'in', self.ids)],
['link_id'],
['__count'],
)
mapped_data = {link.id: count for link, count in clicks_data}
for tracker in self:
tracker.count = mapped_data.get(tracker.id, 0)
@api.depends('code')
def _compute_short_url(self):
for tracker in self:
tracker.short_url = urls.url_join(tracker.short_url_host or '', tracker.code or '')
def _compute_short_url_host(self):
for tracker in self:
tracker.short_url_host = tracker.get_base_url() + '/r/'
def _compute_code(self):
for tracker in self:
record = self.env['link.tracker.code'].search([('link_id', '=', tracker.id)], limit=1, order='id DESC')
tracker.code = record.code
def _inverse_code(self):
self.ensure_one()
if not self.code:
return
record = self.env['link.tracker.code'].search([('link_id', '=', self.id)], limit=1, order='id DESC')
if record:
record.code = self.code
@api.depends('url')
def _compute_redirected_url(self):
"""Compute the URL to which we will redirect the user.
By default, add UTM values as GET parameters. But if the system parameter
`link_tracker.no_external_tracking` is set, we add the UTM values in the URL
*only* for URLs that redirect to the local website (base URL).
"""
no_external_tracking = self.env['ir.config_parameter'].sudo().get_param('link_tracker.no_external_tracking')
for tracker in self:
base_domain = urls.url_parse(tracker.get_base_url()).netloc
parsed = urls.url_parse(tracker.url)
if no_external_tracking and parsed.netloc and parsed.netloc != base_domain:
tracker.redirected_url = parsed.to_url()
continue
utms = {}
for key, field_name, cook in self.env['utm.mixin'].tracking_fields():
field = self._fields[field_name]
attr = tracker[field_name]
if field.type == 'many2one':
attr = attr.name
if attr:
utms[key] = attr
utms.update(parsed.decode_query())
tracker.redirected_url = parsed.replace(query=urls.url_encode(utms)).to_url()
@api.model
@api.depends('url')
def _get_title_from_url(self, url):
preview = link_preview.get_link_preview_from_url(url)
if preview and preview.get('og_title'):
return preview['og_title']
return url
@api.constrains(*LINK_TRACKER_UNIQUE_FIELDS)
def _check_unicity(self):
"""Check that the link trackers are unique."""
def _format_value(tracker, field_name):
if field_name == 'label' and not tracker[field_name]:
return False
return tracker[field_name]
# build a query to fetch all needed link trackers at once
search_query = expression.OR([
expression.AND([
[('url', '=', tracker.url)],
[('campaign_id', '=', tracker.campaign_id.id)],
[('medium_id', '=', tracker.medium_id.id)],
[('source_id', '=', tracker.source_id.id)],
[('label', '=', tracker.label) if tracker.label else ('label', 'in', (False, ''))],
])
for tracker in self
])
# Can not be implemented with a SQL constraint because we care about null values.
potential_duplicates = self.search(search_query)
duplicates = self.browse()
seen = set()
for tracker in potential_duplicates:
unique_fields = tuple(_format_value(tracker, field_name) for field_name in LINK_TRACKER_UNIQUE_FIELDS)
if unique_fields in seen or seen.add(unique_fields):
duplicates += tracker
if duplicates:
error_lines = '\n- '.join(
str((tracker.url, tracker.campaign_id.name, tracker.medium_id.name, tracker.source_id.name, tracker.label or '""'))
for tracker in duplicates
)
raise UserError(
_('Combinations of Link Tracker values (URL, campaign, medium, source, and label) must be unique.\n'
'The following combinations are already used: \n- %(error_lines)s', error_lines=error_lines))
@api.model_create_multi
def create(self, vals_list):
vals_list = [vals.copy() for vals in vals_list]
for vals in vals_list:
if 'url' not in vals:
raise ValueError(_('Creating a Link Tracker without URL is not possible'))
if vals['url'].startswith(('?', '#')):
raise UserError(_("ā%sā is not a valid link, links cannot redirect to the current page.", vals['url']))
vals['url'] = validate_url(vals['url'])
if not vals.get('title'):
vals['title'] = self._get_title_from_url(vals['url'])
# Prevent the UTMs to be set by the values of UTM cookies
for (__, fname, __) in self.env['utm.mixin'].tracking_fields():
if fname not in vals:
vals[fname] = False
links = super(LinkTracker, self).create(vals_list)
link_tracker_codes = self.env['link.tracker.code']._get_random_code_strings(len(vals_list))
self.env['link.tracker.code'].sudo().create([
{
'code': code,
'link_id': link.id,
} for link, code in zip(links, link_tracker_codes)
])
return links
@api.model
def search_or_create(self, vals_list):
"""Get existing or newly created records matching vals_list items in preserved order supporting duplicates."""
if not isinstance(vals_list, list):
_logger.warning("Deprecated usage of LinkTracker.search_or_create which now expects a list of dictionaries as input.")
vals_list = [vals_list]
def _format_key(obj):
"""Generate unique 'key' of trackers, allowing to find duplicates."""
return tuple(
(field_name, obj[field_name].id if isinstance(obj[field_name], models.BaseModel) else obj[field_name])
for field_name in LINK_TRACKER_UNIQUE_FIELDS
)
def _format_key_domain(field_values):
"""Handle "label" being False / '' and be defensive."""
return expression.AND([
[(field_name, '=', value) if value or field_name != 'label' else ('label', 'in', (False, ''))]
for field_name, value in field_values
])
errors = set()
for vals in vals_list:
if 'url' not in vals:
raise ValueError(_('Creating a Link Tracker without URL is not possible'))
if vals['url'].startswith(('?', '#')):
errors.add(_("ā%sā is not a valid link, links cannot redirect to the current page.", vals['url']))
vals['url'] = validate_url(vals['url'])
# fill vals to use direct accessor in _format_key
self._add_missing_default_values(vals)
vals.update({key: False for key in LINK_TRACKER_UNIQUE_FIELDS if not vals.get(key)})
if errors:
raise UserError("\n".join(errors))
# Find unique keys of trackers, then fetch existing trackers
unique_keys = {_format_key(vals) for vals in vals_list}
found_trackers = self.search(expression.OR([_format_key_domain(key) for key in unique_keys]))
key_to_trackers_map = {_format_key(tracker): tracker for tracker in found_trackers}
if len(unique_keys) != len(found_trackers):
# Create trackers for values with unique keys not found
seen_keys = set(key_to_trackers_map.keys())
new_trackers = self.create([
vals for vals in vals_list
if (key := _format_key(vals)) not in seen_keys and not seen_keys.add(key)
])
key_to_trackers_map.update((_format_key(tracker), tracker) for tracker in new_trackers)
# Build final recordset following input order
return self.browse([key_to_trackers_map[_format_key(vals)].id for vals in vals_list])
@api.model
def convert_links(self, html, vals, blacklist=None):
raise NotImplementedError('Moved on mail.render.mixin')
def _convert_links_text(self, body, vals, blacklist=None):
raise NotImplementedError('Moved on mail.render.mixin')
def action_view_statistics(self):
action = self.env['ir.actions.act_window']._for_xml_id('link_tracker.link_tracker_click_action_statistics')
action['domain'] = [('link_id', '=', self.id)]
action['context'] = dict(self._context, create=False)
return action
def action_visit_page(self):
return {
'name': _("Visit Webpage"),
'type': 'ir.actions.act_url',
'url': self.url,
'target': 'new',
}
@api.model
def recent_links(self, filter, limit):
if filter == 'newest':
return self.search_read([], order='create_date DESC, id DESC', limit=limit)
elif filter == 'most-clicked':
return self.search_read([('count', '!=', 0)], order='count DESC, id DESC', limit=limit)
elif filter == 'recently-used':
return self.search_read([('count', '!=', 0)], order='write_date DESC, id DESC', limit=limit)
else:
return {'Error': "This filter doesn't exist."}
@api.model
def get_url_from_code(self, code):
code_rec = self.env['link.tracker.code'].sudo().search([('code', '=', code)])
if not code_rec:
return None
return code_rec.link_id.redirected_url
class LinkTrackerCode(models.Model):
_name = "link.tracker.code"
_description = "Link Tracker Code"
_rec_name = 'code'
code = fields.Char(string='Short URL Code', required=True, store=True)
link_id = fields.Many2one('link.tracker', 'Link', required=True, ondelete='cascade')
_sql_constraints = [
('code', 'unique( code )', 'Code must be unique.')
]
@api.model
def _get_random_code_strings(self, n=1):
size = LINK_TRACKER_MIN_CODE_LENGTH
while True:
code_propositions = [
''.join(random.choices(string.ascii_letters + string.digits, k=size))
for __ in range(n)
]
if len(set(code_propositions)) != n or self.search_count([('code', 'in', code_propositions)], limit=1):
size += 1
else:
return code_propositions
class LinkTrackerClick(models.Model):
_name = "link.tracker.click"
_rec_name = "link_id"
_description = "Link Tracker Click"
campaign_id = fields.Many2one(
'utm.campaign', 'UTM Campaign', index='btree_not_null',
related="link_id.campaign_id", store=True, ondelete="set null")
link_id = fields.Many2one(
'link.tracker', 'Link',
index=True, required=True, ondelete='cascade')
ip = fields.Char(string='Internet Protocol')
country_id = fields.Many2one('res.country', 'Country')
def _prepare_click_values_from_route(self, **route_values):
click_values = dict((fname, route_values[fname]) for fname in self._fields if fname in route_values)
if not click_values.get('country_id') and route_values.get('country_code'):
click_values['country_id'] = self.env['res.country'].search([('code', '=', route_values['country_code'])], limit=1).id
return click_values
@api.model
def add_click(self, code, **route_values):
""" Main API to add a click on a link. """
tracker_code = self.env['link.tracker.code'].search([('code', '=', code)])
if not tracker_code:
return None
route_values['link_id'] = tracker_code.link_id.id
click_values = self._prepare_click_values_from_route(**route_values)
return self.create(click_values)
|