File: common.py

package info (click to toggle)
odoo 18.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 878,716 kB
  • sloc: javascript: 927,937; python: 685,670; xml: 388,524; sh: 1,033; sql: 415; makefile: 26
file content (417 lines) | stat: -rw-r--r-- 18,501 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.

import datetime
import random
import re
import werkzeug

from unittest.mock import patch

from odoo.tools import email_normalize, mail
from odoo.addons.link_tracker.tests.common import MockLinkTracker
from odoo.addons.mail.tests.common import MailCase, MailCommon, mail_new_test_user
from odoo.sql_db import Cursor


class MassMailCase(MailCase, MockLinkTracker):

    # ------------------------------------------------------------
    # ASSERTS
    # ------------------------------------------------------------

    def assertMailingStatistics(self, mailing, **kwargs):
        """ Helper to assert mailing statistics fields. As we have many of them
        it helps lessening test asserts. """
        if not kwargs.get('expected'):
            kwargs['expected'] = len(mailing.mailing_trace_ids)
        if not kwargs.get('delivered'):
            kwargs['delivered'] = len(mailing.mailing_trace_ids)
        for fname in ['scheduled', 'expected', 'sent', 'delivered',
                      'opened', 'replied', 'clicked',
                      'canceled', 'failed', 'bounced']:
            self.assertEqual(
                mailing[fname], kwargs.get(fname, 0),
                'Mailing %s statistics failed: got %s instead of %s' % (fname, mailing[fname], kwargs.get(fname, 0))
            )

    def assertMailTraces(self, recipients_info, mailing, records,
                         check_mail=True, sent_unlink=False,
                         author=None, mail_links_info=None):
        """ Check content of traces. Traces are fetched based on a given mailing
        and records. Their content is compared to recipients_info structure that
        holds expected information. Links content may be checked, notably to
        assert shortening or unsubscribe links. Mail.mail records may optionally
        be checked.

        :param recipients_info: list[{
            # TRACE
            'email': (normalized) email used when sending email and stored on
              trace. May be empty, computed based on partner;
            'failure_type': optional failure type;
            'failure_reason': optional failure reason;
            'partner': res.partner record (may be empty),
            'record: linked record,
            'trace_status': outgoing / sent / open / reply / bounce / error / cancel (sent by default),
            # MAIL.MAIL
            'content': optional content that should be present in mail.mail body_html;
            'email_to_mail': optional email used for the mail, when different from the
              one stored on the trace itself (see 'email_to' in assertMailMail);
            'email_to_recipients': optional email used ofr the outgoing email,
              see 'assertSentEmail';
            'failure_type': propagated from trace;
            'failure_reason': propagated from trace;
            'mail_values': other mail.mail values for assertMailMail;
            }, { ... }]

        :param mailing: a mailing.mailing record from which traces have been
          generated;
        :param records: records given to mailing that generated traces. It is
          used notably to find traces using their IDs;
        :param check_mail: if True, also check mail.mail records that should be
          linked to traces;
        :param sent_unlink: it True, sent mail.mail are deleted and we check gateway
          output result instead of actual mail.mail records;
        :param mail_links_info: if given, should follow order of ``recipients_info``
          and give details about links. See ``assertLinkShortenedHtml`` helper for
          more details about content to give;
        :param author: author of sent mail.mail;
        """
        # map trace state to email state
        state_mapping = {
            'sent': 'sent',
            'open': 'sent',  # opened implies something has been sent
            'reply': 'sent',  # replied implies something has been sent
            'error': 'exception',
            'cancel': 'cancel',
            'bounce': 'cancel',
        }

        traces = self.env['mailing.trace'].search([
            ('mass_mailing_id', 'in', mailing.ids),
            ('res_id', 'in', records.ids)
        ])
        debug_info = '\n'.join(
            f'Trace: to {t.email} - state {t.trace_status} - res_id {t.res_id}'
            for t in traces
        )

        # ensure trace coherency
        self.assertTrue(all(s.model == records._name for s in traces))
        self.assertEqual(set(s.res_id for s in traces), set(records.ids))

        # check each traces
        if not mail_links_info:
            mail_links_info = [None] * len(recipients_info)
        for recipient_info, link_info, record in zip(recipients_info, mail_links_info, records):
            # check input
            invalid = set(recipient_info.keys()) - {
                'content',
                'email', 'email_to_mail', 'email_to_recipients',
                'mail_values',
                'partner', 'record', 'trace_status',
                'failure_type', 'failure_reason',
            }
            if invalid:
                raise AssertionError(f"assertMailTraces: invalid input {invalid}")

            # recipients
            partner = recipient_info.get('partner', self.env['res.partner'])
            email = recipient_info.get('email')
            if email is None and partner:
                email = partner.email_normalized
            email_to_mail = recipient_info.get('email_to_mail') or email
            email_to_recipients = recipient_info.get('email_to_recipients')
            # trace
            failure_type = recipient_info.get('failure_type')
            failure_reason = recipient_info.get('failure_reason')
            status = recipient_info.get('trace_status', 'sent')
            # content
            content = recipient_info.get('content')
            record = record or recipient_info.get('record')

            recipient_trace = traces.filtered(
                lambda t: (t.email == email or (not email and not t.email)) and \
                          t.trace_status == status and \
                          (t.res_id == record.id if record else True)
            )
            self.assertTrue(
                len(recipient_trace) == 1,
                'MailTrace: email %s (recipient %s, status: %s, record: %s): found %s records (1 expected)\n%s' % (
                    email, partner, status, record,
                    len(recipient_trace), debug_info)
            )
            self.assertTrue(bool(recipient_trace.mail_mail_id_int))
            if 'failure_type' in recipient_info or status in ('error', 'cancel', 'bounce'):
                self.assertEqual(recipient_trace.failure_type, failure_type)
            if 'failure_reason' in recipient_info:
                self.assertEqual(recipient_trace.failure_reason, failure_reason)

            if check_mail:
                if author is None:
                    author = self.env.user.partner_id

                # mail.mail specific values to check
                fields_values = {'mailing_id': mailing}
                if recipient_info.get('mail_values'):
                    fields_values.update(recipient_info['mail_values'])
                if 'failure_type' in recipient_info:
                    fields_values['failure_type'] = failure_type
                if 'failure_reason' in recipient_info:
                    fields_values['failure_reason'] = failure_reason
                if 'email_to_mail' in recipient_info:
                    fields_values['email_to'] = recipient_info['email_to_mail']
                if partner:
                    fields_values['recipient_ids'] = partner

                # specific for partner: email_formatted is used
                if partner:
                    if status == 'sent' and sent_unlink:
                        self.assertSentEmail(author, [partner])
                    else:
                        self.assertMailMail(
                            partner, state_mapping[status],
                            author=author,
                            content=content,
                            email_to_recipients=email_to_recipients,
                            fields_values=fields_values,
                        )
                # specific if email is False -> could have troubles finding it if several falsy traces
                elif not email and status in ('cancel', 'bounce'):
                    self.assertMailMailWId(
                        recipient_trace.mail_mail_id_int, state_mapping[status],
                        author=author,
                        content=content,
                        email_to_recipients=email_to_recipients,
                        fields_values=fields_values,
                    )
                else:
                    self.assertMailMailWEmails(
                        [email_to_mail], state_mapping[status],
                        author=author,
                        content=content,
                        email_to_recipients=email_to_recipients,
                        fields_values=fields_values,
                    )

            if link_info:
                trace_mail = self._find_mail_mail_wrecord(record)
                for (anchor_id, url, is_shortened, add_link_params) in link_info:
                    link_params = {'utm_medium': 'Email', 'utm_source': mailing.name}
                    if add_link_params:
                        link_params.update(**add_link_params)
                    self.assertLinkShortenedHtml(
                        trace_mail.body_html,
                        (anchor_id, url, is_shortened),
                        link_params=link_params,
                    )

    # ------------------------------------------------------------
    # TOOLS
    # ------------------------------------------------------------

    def gateway_mail_trace_bounce(self, mailing, record, bounce_base_values=None):
        """ Generate a bounce at mailgateway level.

        :param mailing: a ``mailing.mailing`` record on which we find a trace
          to bounce;
        :param record: record which should bounce;
        :param bounce_base_values: optional values given to routing;
        """
        record_email = record[record._primary_email]
        trace = mailing.mailing_trace_ids.filtered(
            lambda t: t.model == record._name and t.res_id == record.id
        )
        self.assertTrue(trace)
        self.assertEqual(trace.email, email_normalize(record_email))

        parsed_bounce_values = {
            'email_from': 'some.email@external.example.com',  # TDE check: email_from -> trace email ?
            'to': 'bounce@test.example.com',  # TDE check: bounce alias ?
            'message_id': mail.generate_tracking_message_id('MailTest'),
            'bounced_partner': self.env['res.partner'].sudo(),
            'bounced_message': self.env['mail.message'].sudo(),
            'body': 'This is the bounce email',
        }
        if bounce_base_values:
            parsed_bounce_values.update(bounce_base_values)
        parsed_bounce_values.update({
            'bounced_email': trace.email,
            'bounced_msg_ids': [trace.message_id],
        })
        self.env['mail.thread']._routing_handle_bounce(False, parsed_bounce_values)
        return trace

    def gateway_mail_trace_click(self, mailing, record, click_label):
        """ Simulate a click on a sent email.

        :param mailing: a ``mailing.mailing`` record on which we find a trace
          to click;
        :param record: record which should click;
        :param click_label: label of link on which we should click;
        """
        record_email = record[record._primary_email]
        trace = mailing.mailing_trace_ids.filtered(
            lambda t: t.model == record._name and t.res_id == record.id
        )
        self.assertTrue(trace)
        self.assertEqual(trace.email, email_normalize(record_email))

        email = self._find_sent_mail_wemail(trace.email)
        self.assertTrue(bool(email))
        for (_url_href, link_url, _dummy, label) in re.findall(mail.HTML_TAG_URL_REGEX, email['body']):
            if label == click_label and '/r/' in link_url:  # shortened link, like 'http://localhost:8069/r/LBG/m/53'
                parsed_url = werkzeug.urls.url_parse(link_url)
                path_items = parsed_url.path.split('/')
                code, trace_id = path_items[2], int(path_items[4])
                self.assertEqual(trace.id, trace_id)

                self.env['link.tracker.click'].sudo().add_click(
                    code,
                    ip='100.200.300.%3f' % random.random(),
                    country_code='BE',
                    mailing_trace_id=trace.id
                )
                break
        else:
            raise AssertionError('url %s not found in mailing %s for record %s' % (click_label, mailing, record))
        return trace

    def gateway_mail_trace_open(self, mailing, record):
        """ Simulate opening an email through blank.gif icon access. As we
        don't want to use the whole Http layer just for that we will just
        call 'set_opened()' on trace, until having a better option.

        :param mailing: a ``mailing.mailing`` record on which we find a trace
          to open;
        :param record: record which should open;
        """
        trace = mailing.mailing_trace_ids.filtered(
            lambda t: t.model == record._name and t.res_id == record.id
        )
        self.assertTrue(trace)

        trace.set_opened()
        return trace

    def gateway_mail_trace_reply(self, mailing, record):
        """ Simulate replying to an email. As we don't want to use the whole
        mail and gateway layer just for that we will just call 'set_replied()'
        on trace.

        :param mailing: a ``mailing.mailing`` record on which we find a trace
          to open;
        :param record: record which should open;
        """
        trace = mailing.mailing_trace_ids.filtered(
            lambda t: t.model == record._name and t.res_id == record.id
        )
        self.assertTrue(trace)

        trace.set_replied()
        return trace

    @classmethod
    def _create_bounce_trace(cls, mailing, records, dt=None):
        if dt is None:
            dt = datetime.datetime.now() - datetime.timedelta(days=1)
        return cls._create_traces(mailing, records, dt, trace_status='bounce')

    @classmethod
    def _create_sent_traces(cls, mailing, records, dt=None):
        if dt is None:
            dt = datetime.datetime.now() - datetime.timedelta(days=1)
        return cls._create_traces(mailing, records, dt, trace_status='sent')

    @classmethod
    def _create_traces(cls, mailing, records, dt, **values):
        if 'email_normalized' in records:
            fname = 'email_normalized'
        elif 'email_from' in records:
            fname = 'email_from'
        else:
            fname = 'email'
        randomized = random.random()
        # Cursor.now() uses transaction's timestamp and not datetime lib -> freeze_time
        # is not sufficient
        with patch.object(Cursor, 'now', lambda *args, **kwargs: dt):
            traces = cls.env['mailing.trace'].sudo().create([
                dict({'mass_mailing_id': mailing.id,
                      'model': record._name,
                      'res_id': record.id,
                      'trace_status': values.get('trace_status', 'bounce'),
                      # TDE FIXME: improve this with a mail-enabled heuristics
                      'email': record[fname],
                      'message_id': '<%5f@gilbert.boitempomils>' % randomized,
                     }, **values)
                for record in records
            ])
        return traces

    @classmethod
    def _create_mailing_list(cls):
        """ Shortcut to create mailing lists. Currently hardcoded, maybe evolve
        in a near future. """
        cls.mailing_list_1, cls.mailing_list_2, cls.mailing_list_3, cls.mailing_list_4 = cls.env['mailing.list'].with_context(cls._test_context).create([
            {
                'contact_ids': [
                    (0, 0, {'name': 'Déboulonneur', 'email': 'fleurus@example.com'}),
                    (0, 0, {'name': 'Gorramts', 'email': 'gorramts@example.com'}),
                    (0, 0, {'name': 'Ybrant', 'email': 'ybrant@example.com'}),
                ],
                'name': 'List1',
                'is_public': True,
            }, {
                'contact_ids': [
                    (0, 0, {'name': 'Gilberte', 'email': 'gilberte@example.com'}),
                    (0, 0, {'name': 'Gilberte En Mieux', 'email': 'gilberte@example.com'}),
                    (0, 0, {'name': 'Norbert', 'email': 'norbert@example.com'}),
                    (0, 0, {'name': 'Ybrant', 'email': 'ybrant@example.com'}),
                ],
                'name': 'List2',
                'is_public': True,
            }, {
                'contact_ids': [
                    (0, 0, {'name': 'Déboulonneur', 'email': 'fleurus@example.com'}),
                ],
                'name': 'List3',
                'is_public': True,
            }, {
                'name': 'List4',
            }
        ])
        cls.mailing_list_3.subscription_ids[0].opt_out = True

    @classmethod
    def _create_mailing_list_of_x_contacts(cls, contacts_nbr):
        """ Shortcut to create a mailing list that contains a defined number
        of contacts. """
        return cls.env['mailing.list'].with_context(cls._test_context).create({
            'name': 'Test List',
            'contact_ids': [
                (0, 0, {
                    'name': f'Contact %{idx}',
                    'email': f'contact%{idx}@example.com'
                })
                for idx in range(contacts_nbr)
            ],
        })


class MassMailCommon(MailCommon, MassMailCase):

    @classmethod
    def setUpClass(cls):
        super(MassMailCommon, cls).setUpClass()

        cls.user_marketing = mail_new_test_user(
            cls.env,
            groups='base.group_user,base.group_partner_manager,mass_mailing.group_mass_mailing_user',
            login='user_marketing',
            name='Martial Marketing',
            signature='--\nMartial',
        )

        cls.email_reply_to = 'MyCompany SomehowAlias <test.alias@test.mycompany.com>'

        cls.env.flush_all()