1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
|
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import json
import logging
import time
import requests
from werkzeug.urls import url_encode, url_join
from odoo import _, api, fields, models, tools
from odoo.exceptions import AccessError, UserError
_logger = logging.getLogger(__name__)
class GoogleGmailMixin(models.AbstractModel):
_name = 'google.gmail.mixin'
_description = 'Google Gmail Mixin'
_SERVICE_SCOPE = 'https://mail.google.com/'
google_gmail_authorization_code = fields.Char(string='Authorization Code', groups='base.group_system', copy=False)
google_gmail_refresh_token = fields.Char(string='Refresh Token', groups='base.group_system', copy=False)
google_gmail_access_token = fields.Char(string='Access Token', groups='base.group_system', copy=False)
google_gmail_access_token_expiration = fields.Integer(string='Access Token Expiration Timestamp', groups='base.group_system', copy=False)
google_gmail_uri = fields.Char(compute='_compute_gmail_uri', string='URI', help='The URL to generate the authorization code from Google', groups='base.group_system')
@api.depends('google_gmail_authorization_code')
def _compute_gmail_uri(self):
Config = self.env['ir.config_parameter'].sudo()
google_gmail_client_id = Config.get_param('google_gmail_client_id')
google_gmail_client_secret = Config.get_param('google_gmail_client_secret')
base_url = self.get_base_url()
redirect_uri = url_join(base_url, '/google_gmail/confirm')
if not google_gmail_client_id or not google_gmail_client_secret:
self.google_gmail_uri = False
else:
for record in self:
google_gmail_uri = 'https://accounts.google.com/o/oauth2/v2/auth?%s' % url_encode({
'client_id': google_gmail_client_id,
'redirect_uri': redirect_uri,
'response_type': 'code',
'scope': self._SERVICE_SCOPE,
# access_type and prompt needed to get a refresh token
'access_type': 'offline',
'prompt': 'consent',
'state': json.dumps({
'model': record._name,
'id': record.id or False,
'csrf_token': record._get_gmail_csrf_token() if record.id else False,
})
})
record.google_gmail_uri = google_gmail_uri
def open_google_gmail_uri(self):
"""Open the URL to accept the Gmail permission.
This is done with an action, so we can force the user the save the form.
We need him to save the form so the current mail server record exist in DB, and
we can include the record ID in the URL.
"""
self.ensure_one()
if not self.env.user.has_group('base.group_system'):
raise AccessError(_('Only the administrator can link a Gmail mail server.'))
if not self.google_gmail_uri:
raise UserError(_('Please configure your Gmail credentials.'))
return {
'type': 'ir.actions.act_url',
'url': self.google_gmail_uri,
}
def _fetch_gmail_refresh_token(self, authorization_code):
"""Request the refresh token and the initial access token from the authorization code.
:return:
refresh_token, access_token, access_token_expiration
"""
response = self._fetch_gmail_token('authorization_code', code=authorization_code)
return (
response['refresh_token'],
response['access_token'],
int(time.time()) + response['expires_in'],
)
def _fetch_gmail_access_token(self, refresh_token):
"""Refresh the access token thanks to the refresh token.
:return:
access_token, access_token_expiration
"""
response = self._fetch_gmail_token('refresh_token', refresh_token=refresh_token)
return (
response['access_token'],
int(time.time()) + response['expires_in'],
)
def _fetch_gmail_token(self, grant_type, **values):
"""Generic method to request an access token or a refresh token.
Return the JSON response of the GMail API and manage the errors which can occur.
:param grant_type: Depends the action we want to do (refresh_token or authorization_code)
:param values: Additional parameters that will be given to the GMail endpoint
"""
Config = self.env['ir.config_parameter'].sudo()
google_gmail_client_id = Config.get_param('google_gmail_client_id')
google_gmail_client_secret = Config.get_param('google_gmail_client_secret')
base_url = self.get_base_url()
redirect_uri = url_join(base_url, '/google_gmail/confirm')
response = requests.post(
'https://oauth2.googleapis.com/token',
data={
'client_id': google_gmail_client_id,
'client_secret': google_gmail_client_secret,
'grant_type': grant_type,
'redirect_uri': redirect_uri,
**values,
},
timeout=5,
)
if not response.ok:
raise UserError(_('An error occurred when fetching the access token.'))
return response.json()
def _generate_oauth2_string(self, user, refresh_token):
"""Generate a OAuth2 string which can be used for authentication.
:param user: Email address of the Gmail account to authenticate
:param refresh_token: Refresh token for the given Gmail account
:return: The SASL argument for the OAuth2 mechanism.
"""
self.ensure_one()
now_timestamp = int(time.time())
if not self.google_gmail_access_token \
or not self.google_gmail_access_token_expiration \
or self.google_gmail_access_token_expiration < now_timestamp:
access_token, expiration = self._fetch_gmail_access_token(self.google_gmail_refresh_token)
self.write({
'google_gmail_access_token': access_token,
'google_gmail_access_token_expiration': expiration,
})
_logger.info(
'Google Gmail: fetch new access token. Expires in %i minutes',
(self.google_gmail_access_token_expiration - now_timestamp) // 60)
else:
_logger.info(
'Google Gmail: reuse existing access token. Expire in %i minutes',
(self.google_gmail_access_token_expiration - now_timestamp) // 60)
return 'user=%s\1auth=Bearer %s\1\1' % (user, self.google_gmail_access_token)
def _get_gmail_csrf_token(self):
"""Generate a CSRF token that will be verified in `google_gmail_callback`.
This will prevent a malicious person to make an admin user disconnect the mail servers.
"""
self.ensure_one()
_logger.info('Google Gmail: generate CSRF token for %s #%i', self._name, self.id)
return tools.misc.hmac(
env=self.env(su=True),
scope='google_gmail_oauth',
message=(self._name, self.id),
)
|