File: mailing_contact_import.py

package info (click to toggle)
odoo 18.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 878,716 kB
  • sloc: javascript: 927,937; python: 685,670; xml: 388,524; sh: 1,033; sql: 415; makefile: 26
file content (132 lines) | stat: -rw-r--r-- 5,125 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.

from odoo import fields, models, tools, Command, _
from odoo.tools.misc import clean_context


class MailingContactImport(models.TransientModel):
    _name = 'mailing.contact.import'
    _description = 'Mailing Contact Import'

    mailing_list_ids = fields.Many2many('mailing.list', string='Lists')
    contact_list = fields.Text('Contact List', help='Contact list that will be imported, one contact per line')

    def action_import(self):
        """Import each lines of "contact_list" as a new contact."""
        self.ensure_one()
        contacts = tools.mail.email_split_tuples(', '.join((self.contact_list or '').splitlines()))
        if not contacts:
            return {
                'type': 'ir.actions.client',
                'tag': 'display_notification',
                'params': {
                    'message': _('No valid email address found.'),
                    'next': {'type': 'ir.actions.act_window_close'},
                    'sticky': False,
                    'type': 'warning',
                }
            }

        if len(contacts) > 5000:
            return {
                'type': 'ir.actions.client',
                'tag': 'display_notification',
                'params': {
                    'message': _('You have to much emails, please upload a file.'),
                    'type': 'warning',
                    'sticky': False,
                    'next': self.action_open_base_import(),
                }
            }

        all_emails = list({values[1].lower() for values in contacts})

        existing_contacts = self.env['mailing.contact'].search([
            ('email_normalized', 'in', all_emails),
            ('list_ids', 'in', self.mailing_list_ids.ids),
        ])
        existing_contacts = {
            contact.email_normalized: contact
            for contact in existing_contacts
        }

        # Remove duplicated record, keep only the first non-empty name for each email address
        unique_contacts = {}
        for name, email in contacts:
            email = email.lower()
            if unique_contacts.get(email, {}).get('name'):
                continue

            if email in existing_contacts and not self.mailing_list_ids < existing_contacts[email].list_ids:
                existing_contacts[email].list_ids |= self.mailing_list_ids
            if email not in existing_contacts:
                unique_contacts[email] = {
                    'name': name,
                    'subscription_ids': [
                        Command.create({'list_id': mailing_list_id.id})
                        for mailing_list_id in self.mailing_list_ids
                    ],
                }

        if not unique_contacts:
            return {
                'type': 'ir.actions.client',
                'tag': 'display_notification',
                'params': {
                    'message': _('No contacts were imported. All email addresses are already in the mailing list.'),
                    'next': {'type': 'ir.actions.act_window_close'},
                    'sticky': False,
                    'type': 'warning',
                }
            }

        new_contacts = self.env['mailing.contact'].with_context(clean_context(self.env.context)).create([
            {
                'email': email,
                **values,
            }
            for email, values in unique_contacts.items()
        ])

        if ignored := len(contacts) - len(unique_contacts):
            message = _(
                "Contacts successfully imported. Number of contacts imported: %(imported_count)s. Number of duplicates ignored: %(duplicate_count)s",
                imported_count=len(unique_contacts),
                duplicate_count=ignored,
            )
        else:
            message = _("Contacts successfully imported. Number of contacts imported: %(imported_count)s", imported_count=len(unique_contacts))

        return {
            'type': 'ir.actions.client',
            'tag': 'display_notification',
            'params': {
                'message': message,
                'type': 'success',
                'sticky': False,
                'next': {
                    'context': self.env.context,
                    'domain': [('id', 'in', new_contacts.ids)],
                    'name': _('New contacts imported'),
                    'res_model': 'mailing.contact',
                    'type': 'ir.actions.act_window',
                    'view_mode': 'list',
                    'views': [[False, 'list'], [False, 'form']],
                },
            }
        }

    def action_open_base_import(self):
        """Open the base import wizard to import mailing list contacts with a xlsx file."""
        self.ensure_one()

        return {
            'type': 'ir.actions.client',
            'tag': 'import',
            'name': _('Import Mailing Contacts'),
            'params': {
                'context': self.env.context,
                'active_model': 'mailing.contact',
            }
        }