1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
|
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import logging
import mimetypes
import requests
import werkzeug.utils
from werkzeug.urls import url_encode
from odoo import http, tools, _
from odoo.http import request
from odoo.tools.mimetypes import guess_mimetype
from odoo.addons.html_editor.controllers.main import HTML_Editor
logger = logging.getLogger(__name__)
class Web_Unsplash(http.Controller):
def _get_access_key(self):
""" Use this method to get the key, needed for internal reason """
return request.env['ir.config_parameter'].sudo().get_param('unsplash.access_key')
def _notify_download(self, url):
''' Notifies Unsplash from an image download. (API requirement)
:param url: the download_url of the image to be notified
This method won't return anything. This endpoint should just be
pinged with a simple GET request for Unsplash to increment the image
view counter.
'''
try:
if not url.startswith('https://api.unsplash.com/photos/') and not request.env.registry.in_test_mode():
raise Exception(_("ERROR: Unknown Unsplash notify URL!"))
access_key = self._get_access_key()
requests.get(url, params=url_encode({'client_id': access_key}))
except Exception as e:
logger.exception("Unsplash download notification failed: " + str(e))
# ------------------------------------------------------
# add unsplash image url
# ------------------------------------------------------
@http.route('/web_unsplash/attachment/add', type='json', auth='user', methods=['POST'])
def save_unsplash_url(self, unsplashurls=None, **kwargs):
"""
unsplashurls = {
image_id1: {
url: image_url,
download_url: download_url,
},
image_id2: {
url: image_url,
download_url: download_url,
},
.....
}
"""
def slugify(s):
''' Keeps only alphanumeric characters, hyphens and spaces from a string.
The string will also be truncated to 1024 characters max.
:param s: the string to be filtered
:return: the sanitized string
'''
return "".join([c for c in s if c.isalnum() or c in list("- ")])[:1024]
if not unsplashurls:
return []
uploads = []
query = kwargs.get('query', '')
query = slugify(query)
res_model = kwargs.get('res_model', 'ir.ui.view')
if res_model != 'ir.ui.view' and kwargs.get('res_id'):
res_id = int(kwargs['res_id'])
else:
res_id = None
for key, value in unsplashurls.items():
url = value.get('url')
try:
if not url.startswith(('https://images.unsplash.com/', 'https://plus.unsplash.com/')) and not request.env.registry.in_test_mode():
logger.exception("ERROR: Unknown Unsplash URL!: " + url)
raise Exception(_("ERROR: Unknown Unsplash URL!"))
req = requests.get(url)
if req.status_code != requests.codes.ok:
continue
# get mime-type of image url because unsplash url dosn't contains mime-types in url
image = req.content
except requests.exceptions.ConnectionError as e:
logger.exception("Connection Error: " + str(e))
continue
except requests.exceptions.Timeout as e:
logger.exception("Timeout: " + str(e))
continue
image = tools.image_process(image, verify_resolution=True)
mimetype = guess_mimetype(image)
# append image extension in name
query += mimetypes.guess_extension(mimetype) or ''
# /unsplash/5gR788gfd/lion
url_frags = ['unsplash', key, query]
attachment_data = {
'name': '_'.join(url_frags),
'url': '/' + '/'.join(url_frags),
'data': image,
'res_id': res_id,
'res_model': res_model,
}
attachment = HTML_Editor._attachment_create(self, **attachment_data)
if value.get('description'):
attachment.description = value.get('description')
attachment.generate_access_token()
uploads.append(attachment._get_media_info())
# Notifies Unsplash from an image download. (API requirement)
self._notify_download(value.get('download_url'))
return uploads
@http.route("/web_unsplash/fetch_images", type='json', auth="user")
def fetch_unsplash_images(self, **post):
access_key = self._get_access_key()
app_id = self.get_unsplash_app_id()
if not access_key or not app_id:
if not request.env.user._can_manage_unsplash_settings():
return {'error': 'no_access'}
return {'error': 'key_not_found'}
post['client_id'] = access_key
response = requests.get('https://api.unsplash.com/search/photos/', params=url_encode(post))
if response.status_code == requests.codes.ok:
return response.json()
else:
if not request.env.user._can_manage_unsplash_settings():
return {'error': 'no_access'}
return {'error': response.status_code}
@http.route("/web_unsplash/get_app_id", type='json', auth="public")
def get_unsplash_app_id(self, **post):
return request.env['ir.config_parameter'].sudo().get_param('unsplash.app_id')
@http.route("/web_unsplash/save_unsplash", type='json', auth="user")
def save_unsplash(self, **post):
if request.env.user._can_manage_unsplash_settings():
request.env['ir.config_parameter'].sudo().set_param('unsplash.app_id', post.get('appId'))
request.env['ir.config_parameter'].sudo().set_param('unsplash.access_key', post.get('key'))
return True
raise werkzeug.exceptions.NotFound()
|