1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
# This file is part of Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
import json
from trytond.protocols.wrappers import (
HTTPStatus, Response, abort, with_pool, with_transaction)
from trytond.wsgi import app
@app.route(
'/<database_name>/inbound_email/inbox/<identifier>', methods={'POST'})
@with_pool
@with_transaction()
def inbound_email(request, pool, identifier):
Inbox = pool.get('inbound.email.inbox')
Email = pool.get('inbound.email')
try:
inbox, = Inbox.search([
('identifier', '=', identifier),
])
except ValueError:
abort(HTTPStatus.NOT_FOUND)
data_type = request.args.get('type', 'raw')
if request.form:
data = json.dumps(request.form.to_dict()).encode()
else:
data = request.data
emails = Email.from_webhook(inbox, data, data_type)
if not emails:
abort(HTTPStatus.BAD_REQUEST)
for email in emails:
inbox.process(email)
Email.save(emails)
return Response(status=HTTPStatus.NO_CONTENT)
|